Revert "Use Rust erasure library and turn on erasure (#3768)" (#3827)

This reverts commit b9bb5af4a5.
This commit is contained in:
sakridge 2019-04-17 12:52:12 -07:00 committed by GitHub
parent 2518e95fb0
commit 4b8cb72977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 758 additions and 631 deletions

13
Cargo.lock generated
View File

@ -1877,17 +1877,6 @@ dependencies = [
"redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "reed-solomon-erasure"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.1.2" version = "1.1.2"
@ -2181,7 +2170,6 @@ dependencies = [
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)",
"ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)", "ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3585,7 +3573,6 @@ dependencies = [
"checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85" "checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85"
"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
"checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828" "checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828"
"checksum reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77cbbd4c02f53e345fe49e74255a1b10080731ffb2a03475e11df7fc8a043c37"
"checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f" "checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f"
"checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861" "checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861"
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"

View File

@ -40,7 +40,6 @@ nix = "0.13.0"
rand = "0.6.5" rand = "0.6.5"
rand_chacha = "0.1.1" rand_chacha = "0.1.1"
rayon = "1.0.0" rayon = "1.0.0"
reed-solomon-erasure = "3.1.1"
reqwest = "0.9.11" reqwest = "0.9.11"
ring = "0.13.2" ring = "0.13.2"
rocksdb = "0.11.0" rocksdb = "0.11.0"

View File

@ -24,8 +24,9 @@ fn main() {
let chacha = !env::var("CARGO_FEATURE_CHACHA").is_err(); let chacha = !env::var("CARGO_FEATURE_CHACHA").is_err();
let cuda = !env::var("CARGO_FEATURE_CUDA").is_err(); let cuda = !env::var("CARGO_FEATURE_CUDA").is_err();
let erasure = !env::var("CARGO_FEATURE_ERASURE").is_err();
if chacha || cuda { if chacha || cuda || erasure {
println!("cargo:rerun-if-changed={}", perf_libs_dir); println!("cargo:rerun-if-changed={}", perf_libs_dir);
println!("cargo:rustc-link-search=native={}", perf_libs_dir); println!("cargo:rustc-link-search=native={}", perf_libs_dir);
} }
@ -45,4 +46,30 @@ fn main() {
println!("cargo:rustc-link-lib=dylib=cuda"); println!("cargo:rustc-link-lib=dylib=cuda");
println!("cargo:rustc-link-lib=dylib=cudadevrt"); println!("cargo:rustc-link-lib=dylib=cudadevrt");
} }
if erasure {
#[cfg(any(target_os = "macos", target_os = "ios"))]
{
println!(
"cargo:rerun-if-changed={}/libgf_complete.dylib",
perf_libs_dir
);
println!("cargo:rerun-if-changed={}/libJerasure.dylib", perf_libs_dir);
}
#[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))]
{
println!("cargo:rerun-if-changed={}/libgf_complete.so", perf_libs_dir);
println!("cargo:rerun-if-changed={}/libJerasure.so", perf_libs_dir);
}
#[cfg(windows)]
{
println!(
"cargo:rerun-if-changed={}/libgf_complete.dll",
perf_libs_dir
);
println!("cargo:rerun-if-changed={}/libJerasure.dll", perf_libs_dir);
}
println!("cargo:rustc-link-lib=dylib=Jerasure");
println!("cargo:rustc-link-lib=dylib=gf_complete");
}
} }

View File

@ -3,6 +3,7 @@
//! access read to a persistent file-based ledger. //! access read to a persistent file-based ledger.
use crate::entry::Entry; use crate::entry::Entry;
#[cfg(feature = "erasure")]
use crate::erasure; use crate::erasure;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
@ -16,6 +17,7 @@ use hashbrown::HashMap;
#[cfg(not(feature = "kvstore"))] #[cfg(not(feature = "kvstore"))]
use rocksdb; use rocksdb;
#[cfg(feature = "erasure")]
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
@ -77,9 +79,9 @@ pub struct Blocktree {
meta_cf: LedgerColumn<cf::SlotMeta>, meta_cf: LedgerColumn<cf::SlotMeta>,
data_cf: LedgerColumn<cf::Data>, data_cf: LedgerColumn<cf::Data>,
erasure_cf: LedgerColumn<cf::Coding>, erasure_cf: LedgerColumn<cf::Coding>,
#[cfg(feature = "erasure")]
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>, erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
orphans_cf: LedgerColumn<cf::Orphans>, orphans_cf: LedgerColumn<cf::Orphans>,
session: Arc<erasure::Session>,
pub new_blobs_signals: Vec<SyncSender<bool>>, pub new_blobs_signals: Vec<SyncSender<bool>>,
pub root_slot: RwLock<u64>, pub root_slot: RwLock<u64>,
} }
@ -90,6 +92,7 @@ pub const META_CF: &str = "meta";
pub const DATA_CF: &str = "data"; pub const DATA_CF: &str = "data";
// Column family for erasure data // Column family for erasure data
pub const ERASURE_CF: &str = "erasure"; pub const ERASURE_CF: &str = "erasure";
#[cfg(feature = "erasure")]
pub const ERASURE_META_CF: &str = "erasure_meta"; pub const ERASURE_META_CF: &str = "erasure_meta";
// Column family for orphans data // Column family for orphans data
pub const ORPHANS_CF: &str = "orphans"; pub const ORPHANS_CF: &str = "orphans";
@ -113,7 +116,7 @@ impl Blocktree {
// Create the erasure column family // Create the erasure column family
let erasure_cf = LedgerColumn::new(&db); let erasure_cf = LedgerColumn::new(&db);
#[cfg(feature = "erasure")]
let erasure_meta_cf = LedgerColumn::new(&db); let erasure_meta_cf = LedgerColumn::new(&db);
// Create the orphans column family. An "orphan" is defined as // Create the orphans column family. An "orphan" is defined as
@ -121,17 +124,14 @@ impl Blocktree {
// known parent // known parent
let orphans_cf = LedgerColumn::new(&db); let orphans_cf = LedgerColumn::new(&db);
// setup erasure
let session = Arc::new(erasure::Session::default());
Ok(Blocktree { Ok(Blocktree {
db, db,
meta_cf, meta_cf,
data_cf, data_cf,
erasure_cf, erasure_cf,
#[cfg(feature = "erasure")]
erasure_meta_cf, erasure_meta_cf,
orphans_cf, orphans_cf,
session,
new_blobs_signals: vec![], new_blobs_signals: vec![],
root_slot: RwLock::new(0), root_slot: RwLock::new(0),
}) })
@ -259,6 +259,7 @@ impl Blocktree {
// A map from slot to a 2-tuple of metadata: (working copy, backup copy), // A map from slot to a 2-tuple of metadata: (working copy, backup copy),
// so we can detect changes to the slot metadata later // so we can detect changes to the slot metadata later
let mut slot_meta_working_set = HashMap::new(); let mut slot_meta_working_set = HashMap::new();
#[cfg(feature = "erasure")]
let mut erasure_meta_working_set = HashMap::new(); let mut erasure_meta_working_set = HashMap::new();
let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let new_blobs: Vec<_> = new_blobs.into_iter().collect();
let mut prev_inserted_blob_datas = HashMap::new(); let mut prev_inserted_blob_datas = HashMap::new();
@ -300,17 +301,20 @@ impl Blocktree {
continue; continue;
} }
let set_index = ErasureMeta::set_index_for(blob.index()); #[cfg(feature = "erasure")]
let erasure_meta_entry = erasure_meta_working_set {
.entry((blob_slot, set_index)) let set_index = ErasureMeta::set_index_for(blob.index());
.or_insert_with(|| { let erasure_meta_entry = erasure_meta_working_set
self.erasure_meta_cf .entry((blob_slot, set_index))
.get((blob_slot, set_index)) .or_insert_with(|| {
.expect("Expect database get to succeed") self.erasure_meta_cf
.unwrap_or_else(|| ErasureMeta::new(set_index)) .get((blob_slot, set_index))
}); .expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index))
});
erasure_meta_entry.set_data_present(blob.index(), true); erasure_meta_entry.set_data_present(blob.index());
}
let _ = self.insert_data_blob( let _ = self.insert_data_blob(
blob, blob,
@ -335,8 +339,11 @@ impl Blocktree {
} }
} }
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { #[cfg(feature = "erasure")]
write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?; {
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() {
write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?;
}
} }
self.db.write(write_batch)?; self.db.write(write_batch)?;
@ -347,10 +354,35 @@ impl Blocktree {
} }
} }
#[cfg(feature = "erasure")]
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() { for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() {
if erasure_meta.can_recover() { if erasure_meta.can_recover() {
let amount_recovered = self.recover(slot, set_index)?; match self.recover(slot, set_index) {
inc_new_counter_info!("erasures-recovered", amount_recovered); Ok(recovered) => {
inc_new_counter_info!("erasures-recovered", recovered);
}
Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
let mut erasure_meta = self
.erasure_meta_cf
.get((slot, set_index))?
.expect("erasure meta should exist");
let mut batch = self.db.batch()?;
let start_index = erasure_meta.start_index();
let (_, coding_end_idx) = erasure_meta.end_indexes();
erasure_meta.coding = 0;
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
for idx in start_index..coding_end_idx {
batch.delete::<cf::Coding>((slot, idx))?;
}
self.db.write(batch)?;
}
Err(e) => return Err(e),
}
} }
} }
@ -421,42 +453,26 @@ impl Blocktree {
pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> { pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.erasure_cf.get_bytes((slot, index)) self.erasure_cf.get_bytes((slot, index))
} }
pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> {
let set_index = ErasureMeta::set_index_for(index); self.erasure_cf.delete((slot, index))
let mut erasure_meta = self
.erasure_meta_cf
.get((slot, set_index))?
.unwrap_or_else(|| ErasureMeta::new(set_index));
erasure_meta.set_coding_present(index, false);
let mut batch = self.db.batch()?;
batch.delete::<cf::Coding>((slot, index))?;
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
self.db.write(batch)?;
Ok(())
} }
pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> { pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.data_cf.get_bytes((slot, index)) self.data_cf.get_bytes((slot, index))
} }
/// For benchmarks, testing, and setup.
/// Does no metadata tracking. Use with care.
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.data_cf.put_bytes((slot, index), bytes)
}
pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.erasure_cf.put_bytes((slot, index), bytes) self.erasure_cf.put_bytes((slot, index), bytes)
} }
#[cfg(not(feature = "erasure"))]
#[inline]
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.put_coding_blob_bytes_raw(slot, index, bytes)
}
/// this function will insert coding blobs and also automatically track erasure-related /// this function will insert coding blobs and also automatically track erasure-related
/// metadata. If recovery is available it will be done /// metadata. If recovery is available it will be done
#[cfg(feature = "erasure")]
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
let set_index = ErasureMeta::set_index_for(index); let set_index = ErasureMeta::set_index_for(index);
let mut erasure_meta = self let mut erasure_meta = self
@ -464,7 +480,7 @@ impl Blocktree {
.get((slot, set_index))? .get((slot, set_index))?
.unwrap_or_else(|| ErasureMeta::new(set_index)); .unwrap_or_else(|| ErasureMeta::new(set_index));
erasure_meta.set_coding_present(index, true); erasure_meta.set_coding_present(index);
let mut writebatch = self.db.batch()?; let mut writebatch = self.db.batch()?;
@ -475,13 +491,42 @@ impl Blocktree {
self.db.write(writebatch)?; self.db.write(writebatch)?;
if erasure_meta.can_recover() { if erasure_meta.can_recover() {
let amount_recovered = self.recover(slot, set_index)?; match self.recover(slot, set_index) {
inc_new_counter_info!("erasures-recovered", amount_recovered); Ok(recovered) => {
inc_new_counter_info!("erasures-recovered", recovered);
return Ok(());
}
Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
let start_index = erasure_meta.start_index();
let (_, coding_end_idx) = erasure_meta.end_indexes();
let mut batch = self.db.batch()?;
erasure_meta.coding = 0;
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
for idx in start_index..coding_end_idx {
batch.delete::<cf::Coding>((slot, idx as u64))?;
}
self.db.write(batch)?;
return Ok(());
}
Err(e) => return Err(e),
}
} }
Ok(()) Ok(())
} }
pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> {
self.data_cf.put_bytes((slot, index), value)
}
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.data_cf.put_bytes((slot, index), bytes)
}
pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result<Option<Blob>> { pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result<Option<Blob>> {
let bytes = self.get_data_blob_bytes(slot, blob_index)?; let bytes = self.get_data_blob_bytes(slot, blob_index)?;
Ok(bytes.map(|bytes| { Ok(bytes.map(|bytes| {
@ -581,6 +626,20 @@ impl Blocktree {
} }
} }
pub fn find_missing_coding_indexes(
&self,
slot: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
if let Ok(mut db_iterator) = self.erasure_cf.cursor() {
Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing)
} else {
vec![]
}
}
/// Returns the entry vector for the slot starting with `blob_start_index` /// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries( pub fn get_slot_entries(
&self, &self,
@ -1029,45 +1088,43 @@ impl Blocktree {
Ok(()) Ok(())
} }
#[cfg(feature = "erasure")]
/// Attempts recovery using erasure coding /// Attempts recovery using erasure coding
fn recover(&self, slot: u64, set_index: u64) -> Result<usize> { fn recover(&self, slot: u64, set_index: u64) -> Result<usize> {
use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA}; use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA};
use crate::packet::BLOB_DATA_SIZE;
let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap(); let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap();
let start_idx = erasure_meta.start_index(); let start_idx = erasure_meta.start_index();
let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes();
let present = &mut [true; ERASURE_SET_SIZE]; let mut erasures = Vec::with_capacity(NUM_CODING + 1);
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); let (mut data, mut coding) = (vec![], vec![]);
let mut size = 0; let mut size = 0;
for i in start_idx..coding_end_idx { for i in start_idx..coding_end_idx {
if erasure_meta.is_coding_present(i) { if erasure_meta.is_coding_present(i) {
let mut blob_bytes = self let blob_bytes = self
.erasure_cf .erasure_cf
.get_bytes((slot, i))? .get_bytes((slot, i))?
.expect("erasure_meta must have no false positives"); .expect("erasure_meta must have no false positives");
blob_bytes.drain(..BLOB_HEADER_SIZE);
if size == 0 { if size == 0 {
size = blob_bytes.len(); size = blob_bytes.len() - BLOB_HEADER_SIZE;
} }
blobs.push(blob_bytes); coding.push(blob_bytes);
} else { } else {
let set_relative_idx = (i - start_idx) as usize + NUM_DATA; let set_relative_idx = (i - start_idx) + NUM_DATA as u64;
blobs.push(vec![0; size]); coding.push(vec![0; crate::packet::BLOB_SIZE]);
present[set_relative_idx] = false; erasures.push(set_relative_idx as i32);
} }
} }
assert_ne!(size, 0); assert_ne!(size, 0);
for i in start_idx..data_end_idx { for i in start_idx..data_end_idx {
let set_relative_idx = (i - start_idx) as usize;
if erasure_meta.is_data_present(i) { if erasure_meta.is_data_present(i) {
let mut blob_bytes = self let mut blob_bytes = self
.data_cf .data_cf
@ -1075,28 +1132,90 @@ impl Blocktree {
.expect("erasure_meta must have no false positives"); .expect("erasure_meta must have no false positives");
// If data is too short, extend it with zeroes // If data is too short, extend it with zeroes
blob_bytes.resize(size, 0u8); if blob_bytes.len() < size {
blob_bytes.resize(size, 0u8);
}
blobs.insert(set_relative_idx, blob_bytes); data.push(blob_bytes);
} else { } else {
blobs.insert(set_relative_idx, vec![0u8; size]); let set_relative_index = i - start_idx;
data.push(vec![0; size]);
// data erasures must come before any coding erasures if present // data erasures must come before any coding erasures if present
present[set_relative_idx] = false; erasures.insert(0, set_relative_index as i32);
} }
} }
let (recovered_data, recovered_coding) = self let mut coding_ptrs: Vec<_> = coding
.session .iter_mut()
.reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; .map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size])
.collect();
let amount_recovered = recovered_data.len() + recovered_coding.len(); let mut data_ptrs: Vec<_> = data
.iter_mut()
.map(|data_bytes| &mut data_bytes[..size])
.collect();
trace!( // Marks the end
"[recover] reconstruction OK slot: {}, indexes: [{},{})", erasures.push(-1);
slot, trace!("erasures: {:?}, size: {}", erasures, size);
start_idx,
data_end_idx erasure::decode_blocks(
); data_ptrs.as_mut_slice(),
coding_ptrs.as_mut_slice(),
&erasures,
)?;
// Create the missing blobs from the reconstructed data
let block_start_idx = erasure_meta.start_index();
let (mut recovered_data, mut recovered_coding) = (vec![], vec![]);
for i in &erasures[..erasures.len() - 1] {
let n = *i as usize;
let (data_size, idx, first_byte);
if n < NUM_DATA {
let mut blob = Blob::new(&data_ptrs[n]);
idx = n as u64 + block_start_idx;
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
first_byte = blob.data[0];
if data_size > BLOB_DATA_SIZE {
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
return Err(Error::ErasureError(ErasureError::CorruptCoding));
}
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
recovered_data.push(blob);
} else {
let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]);
idx = (n - NUM_DATA) as u64 + block_start_idx;
data_size = size;
first_byte = blob.data[0];
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
return Err(Error::ErasureError(ErasureError::CorruptCoding));
}
blob.set_slot(slot);
blob.set_index(idx);
blob.set_data_size(data_size as u64);
recovered_coding.push(blob);
}
trace!(
"erasures[{}] ({}) size: {} data[0]: {}",
*i,
idx,
data_size,
first_byte,
);
}
self.write_blobs(recovered_data)?; self.write_blobs(recovered_data)?;
@ -1104,7 +1223,7 @@ impl Blocktree {
self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?; self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?;
} }
Ok(amount_recovered) Ok(erasures.len() - 1)
} }
/// Returns the next consumed index and the number of ticks in the new consumed /// Returns the next consumed index and the number of ticks in the new consumed
@ -1702,47 +1821,44 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive"); let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive");
{ {
let blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
for i in 0..4 { let slot = 0;
let slot = i; let parent_slot = 0;
let parent_slot = if i == 0 { 0 } else { i - 1 }; // Write entries
// Write entries let num_entries = 21 as u64;
let num_entries = 21 as u64 * (i + 1); let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
blocktree blocktree
.write_blobs(blobs.iter().skip(1).step_by(2)) .write_blobs(blobs.iter().skip(1).step_by(2))
.unwrap(); .unwrap();
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
if num_entries % 2 == 0 { if num_entries % 2 == 0 {
assert_eq!(meta.received, num_entries);
} else {
debug!("got here");
assert_eq!(meta.received, num_entries - 1);
}
assert_eq!(meta.consumed, 0);
assert_eq!(meta.parent_slot, parent_slot);
if num_entries % 2 == 0 {
assert_eq!(meta.last_index, num_entries - 1);
} else {
assert_eq!(meta.last_index, std::u64::MAX);
}
blocktree.write_blobs(blobs.iter().step_by(2)).unwrap();
assert_eq!(
blocktree.get_slot_entries(slot, 0, None).unwrap(),
original_entries,
);
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
assert_eq!(meta.received, num_entries); assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed, num_entries); } else {
assert_eq!(meta.parent_slot, parent_slot); assert_eq!(meta.received, num_entries - 1);
assert_eq!(meta.last_index, num_entries - 1);
} }
assert_eq!(meta.consumed, 0);
assert_eq!(meta.parent_slot, 0);
if num_entries % 2 == 0 {
assert_eq!(meta.last_index, num_entries - 1);
} else {
assert_eq!(meta.last_index, std::u64::MAX);
}
blocktree.write_blobs(blobs.iter().step_by(2)).unwrap();
assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(),
original_entries,
);
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_entries - 1);
} }
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -2549,6 +2665,7 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
} }
#[cfg(feature = "erasure")]
mod erasure { mod erasure {
use super::*; use super::*;
use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec};
@ -2613,7 +2730,7 @@ pub mod tests {
assert_eq!(erasure_meta.data, 0x00FF); assert_eq!(erasure_meta.data, 0x00FF);
assert_eq!(erasure_meta.coding, 0x0); assert_eq!(erasure_meta.coding, 0x0);
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); let mut coding_generator = CodingGenerator::new();
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]);
for shared_coding_blob in coding_blobs { for shared_coding_blob in coding_blobs {
@ -2632,23 +2749,6 @@ pub mod tests {
assert_eq!(erasure_meta.data, 0xFFFF); assert_eq!(erasure_meta.data, 0xFFFF);
assert_eq!(erasure_meta.coding, 0x0F); assert_eq!(erasure_meta.coding, 0x0F);
let (start_idx, coding_end_idx) =
(erasure_meta.start_index(), erasure_meta.end_indexes().1);
for idx in start_idx..coding_end_idx {
blocktree.delete_coding_blob(slot, idx).unwrap();
}
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, 0))
.expect("DB get must succeed")
.unwrap();
assert!(!erasure_meta.can_recover());
assert_eq!(erasure_meta.data, 0xFFFF);
assert_eq!(erasure_meta.coding, 0x0);
} }
#[test] #[test]
@ -2666,12 +2766,11 @@ pub mod tests {
.map(Blob::into) .map(Blob::into)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); let mut coding_generator = CodingGenerator::new();
for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() {
let focused_index = (set_index + 1) * NUM_DATA - 1; let focused_index = (set_index + 1) * NUM_DATA - 1;
let coding_blobs = coding_generator.next(&data_blobs); let coding_blobs = coding_generator.next(&data_blobs);
assert_eq!(coding_blobs.len(), NUM_CODING); assert_eq!(coding_blobs.len(), NUM_CODING);
let deleted_data = data_blobs[NUM_DATA - 1].clone(); let deleted_data = data_blobs[NUM_DATA - 1].clone();
@ -2722,12 +2821,13 @@ pub mod tests {
Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction"); Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction");
} }
/// FIXME: JERASURE Threading: see Issue
/// [#3725](https://github.com/solana-labs/solana/issues/3725)
#[test] #[test]
fn test_recovery_multi_slot_multi_thread() { fn test_recovery_multi_slot_multi_thread() {
use rand::rngs::SmallRng;
use rand::SeedableRng;
use std::thread; use std::thread;
const USE_THREADS: bool = true;
let slots = vec![0, 3, 5, 50, 100]; let slots = vec![0, 3, 5, 50, 100];
let max_erasure_sets = 16; let max_erasure_sets = 16;
solana_logger::setup(); solana_logger::setup();
@ -2737,7 +2837,7 @@ pub mod tests {
// Specification should generate a ledger where each slot has an random number of // Specification should generate a ledger where each slot has an random number of
// erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones // erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones
// will have between 1 data blob missing and 1 coding blob // will have between 1-4 data blobs missing and all coding blobs
let specs = slots let specs = slots
.iter() .iter()
.map(|&slot| { .map(|&slot| {
@ -2748,7 +2848,7 @@ pub mod tests {
let (num_data, num_coding) = if set_index % 2 == 0 { let (num_data, num_coding) = if set_index % 2 == 0 {
(NUM_DATA - rng.gen_range(1, 5), NUM_CODING) (NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
} else { } else {
(NUM_DATA - 1, NUM_CODING - 1) (NUM_DATA, 0)
}; };
ErasureSpec { ErasureSpec {
set_index, set_index,
@ -2773,60 +2873,35 @@ pub mod tests {
for slot_model in model.clone() { for slot_model in model.clone() {
let blocktree = Arc::clone(&blocktree); let blocktree = Arc::clone(&blocktree);
let slot = slot_model.slot; let slot = slot_model.slot;
let mut rng = SmallRng::from_rng(&mut rng).unwrap(); let closure = move || {
let handle = thread::spawn(move || {
for erasure_set in slot_model.chunks { for erasure_set in slot_model.chunks {
// for even sets, write data blobs first, then write coding blobs, which blocktree
// should trigger recovery since all coding blobs will be inserted and .write_shared_blobs(erasure_set.data)
// between 1-4 data blobs are missing .expect("Writing data blobs must succeed");
if rng.gen() { debug!(
blocktree "multislot: wrote data: slot: {}, erasure_set: {}",
.write_shared_blobs(erasure_set.data) slot, erasure_set.set_index
.expect("Writing data blobs must succeed"); );
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
for shared_coding_blob in erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
} else {
// for odd sets, write coding blobs first, then write the data blobs.
// writing the data blobs should trigger recovery, since 3/4 coding and
// 15/16 data blobs will be present
for shared_coding_blob in erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
for shared_coding_blob in erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree blocktree
.write_shared_blobs(erasure_set.data) .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
.expect("Writing data blobs must succeed"); .expect("Writing coding blobs must succeed");
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
} }
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
} }
}); };
handles.push(handle); if USE_THREADS {
handles.push(thread::spawn(closure));
} else {
closure();
}
} }
handles handles

View File

@ -28,6 +28,7 @@ pub mod columns {
/// Data Column /// Data Column
pub struct Data; pub struct Data;
#[cfg(feature = "erasure")]
#[derive(Debug)] #[derive(Debug)]
/// The erasure meta column /// The erasure meta column
pub struct ErasureMeta; pub struct ErasureMeta;

View File

@ -138,6 +138,7 @@ impl TypedColumn<Kvs> for cf::SlotMeta {
type Type = super::SlotMeta; type Type = super::SlotMeta;
} }
#[cfg(feature = "erasure")]
impl Column<Kvs> for cf::ErasureMeta { impl Column<Kvs> for cf::ErasureMeta {
const NAME: &'static str = super::ERASURE_META_CF; const NAME: &'static str = super::ERASURE_META_CF;
type Index = (u64, u64); type Index = (u64, u64);
@ -156,6 +157,7 @@ impl Column<Kvs> for cf::ErasureMeta {
} }
} }
#[cfg(feature = "erasure")]
impl TypedColumn<Kvs> for cf::ErasureMeta { impl TypedColumn<Kvs> for cf::ErasureMeta {
type Type = super::ErasureMeta; type Type = super::ErasureMeta;
} }

View File

@ -1,3 +1,4 @@
#[cfg(feature = "erasure")]
use crate::erasure::{NUM_CODING, NUM_DATA}; use crate::erasure::{NUM_CODING, NUM_DATA};
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
@ -58,6 +59,7 @@ impl SlotMeta {
} }
} }
#[cfg(feature = "erasure")]
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information /// Erasure coding information
pub struct ErasureMeta { pub struct ErasureMeta {
@ -69,6 +71,7 @@ pub struct ErasureMeta {
pub coding: u64, pub coding: u64,
} }
#[cfg(feature = "erasure")]
impl ErasureMeta { impl ErasureMeta {
pub fn new(set_index: u64) -> ErasureMeta { pub fn new(set_index: u64) -> ErasureMeta {
ErasureMeta { ErasureMeta {
@ -88,56 +91,36 @@ impl ErasureMeta {
} }
pub fn is_coding_present(&self, index: u64) -> bool { pub fn is_coding_present(&self, index: u64) -> bool {
let start = self.start_index(); let set_index = Self::set_index_for(index);
let end = start + NUM_CODING as u64; let position = index - self.start_index();
if start <= index && index < end { set_index == self.set_index && self.coding & (1 << position) != 0
let position = index - start;
self.coding & (1 << position) != 0
} else {
false
}
} }
pub fn set_coding_present(&mut self, index: u64, present: bool) { pub fn set_coding_present(&mut self, index: u64) {
let set_index = Self::set_index_for(index); let set_index = Self::set_index_for(index);
if set_index as u64 == self.set_index { if set_index as u64 == self.set_index {
let position = index - self.start_index(); let position = index - self.start_index();
if present { self.coding |= 1 << position;
self.coding |= 1 << position;
} else {
self.coding &= !(1 << position);
}
} }
} }
pub fn is_data_present(&self, index: u64) -> bool { pub fn is_data_present(&self, index: u64) -> bool {
let start = self.start_index(); let set_index = Self::set_index_for(index);
let end = start + NUM_DATA as u64; let position = index - self.start_index();
if start <= index && index < end { set_index == self.set_index && self.data & (1 << position) != 0
let position = index - start;
self.data & (1 << position) != 0
} else {
false
}
} }
pub fn set_data_present(&mut self, index: u64, present: bool) { pub fn set_data_present(&mut self, index: u64) {
let set_index = Self::set_index_for(index); let set_index = Self::set_index_for(index);
if set_index as u64 == self.set_index { if set_index as u64 == self.set_index {
let position = index - self.start_index(); let position = index - self.start_index();
if present { self.data |= 1 << position;
self.data |= 1 << position;
} else {
self.data &= !(1 << position);
}
} }
} }
@ -156,29 +139,7 @@ impl ErasureMeta {
} }
} }
#[test] #[cfg(feature = "erasure")]
fn test_meta_indexes() {
use rand::{thread_rng, Rng};
let mut rng = thread_rng();
for _ in 0..100 {
let set_index = rng.gen_range(0, 1_000);
let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16);
assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
let e_meta = ErasureMeta::new(set_index);
assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64);
let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64);
assert_eq!(
coding_end_idx,
set_index * NUM_DATA as u64 + NUM_CODING as u64
);
}
}
#[test] #[test]
fn test_meta_coding_present() { fn test_meta_coding_present() {
let set_index = 0; let set_index = 0;
@ -189,7 +150,7 @@ fn test_meta_coding_present() {
}; };
for i in 0..NUM_CODING as u64 { for i in 0..NUM_CODING as u64 {
e_meta.set_coding_present(i, true); e_meta.set_coding_present(i);
assert_eq!(e_meta.is_coding_present(i), true); assert_eq!(e_meta.is_coding_present(i), true);
} }
for i in NUM_CODING as u64..NUM_DATA as u64 { for i in NUM_CODING as u64..NUM_DATA as u64 {
@ -199,7 +160,7 @@ fn test_meta_coding_present() {
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64); e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64);
for i in (NUM_DATA * 17) as u64..((NUM_DATA * 17) + NUM_CODING) as u64 { for i in (NUM_DATA * 17) as u64..((NUM_DATA * 17) + NUM_CODING) as u64 {
e_meta.set_coding_present(i, true); e_meta.set_coding_present(i);
assert_eq!(e_meta.is_coding_present(i), true); assert_eq!(e_meta.is_coding_present(i), true);
} }
for i in (NUM_DATA * 17 + NUM_CODING) as u64..((NUM_DATA * 17) + NUM_DATA) as u64 { for i in (NUM_DATA * 17 + NUM_CODING) as u64..((NUM_DATA * 17) + NUM_DATA) as u64 {
@ -207,6 +168,7 @@ fn test_meta_coding_present() {
} }
} }
#[cfg(feature = "erasure")]
#[test] #[test]
fn test_can_recover() { fn test_can_recover() {
let set_index = 0; let set_index = 0;
@ -249,31 +211,3 @@ fn test_can_recover() {
e_meta.data = 0b1111_1111_1111_1000; e_meta.data = 0b1111_1111_1111_1000;
assert!(e_meta.can_recover()); assert!(e_meta.can_recover());
} }
#[test]
fn test_meta_data_present() {
let set_index = 0;
let mut e_meta = ErasureMeta {
set_index,
data: 0,
coding: 0,
};
for i in 0..NUM_DATA as u64 {
e_meta.set_data_present(i, true);
assert_eq!(e_meta.is_data_present(i), true);
}
for i in NUM_DATA as u64..2 * NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), false);
}
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64);
for i in (NUM_DATA * 23) as u64..(NUM_DATA * 24) as u64 {
e_meta.set_data_present(i, true);
assert_eq!(e_meta.is_data_present(i), true);
}
for i in (NUM_DATA * 22) as u64..(NUM_DATA * 23) as u64 {
assert_eq!(e_meta.is_data_present(i), false);
}
}

View File

@ -30,7 +30,9 @@ impl Backend for Rocks {
type Error = rocksdb::Error; type Error = rocksdb::Error;
fn open(path: &Path) -> Result<Rocks> { fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; #[cfg(feature = "erasure")]
use crate::blocktree::db::columns::ErasureMeta;
use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
fs::create_dir_all(&path)?; fs::create_dir_all(&path)?;
@ -41,6 +43,7 @@ impl Backend for Rocks {
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
#[cfg(feature = "erasure")]
let erasure_meta_cf_descriptor = let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
@ -49,6 +52,7 @@ impl Backend for Rocks {
meta_cf_descriptor, meta_cf_descriptor,
data_cf_descriptor, data_cf_descriptor,
erasure_cf_descriptor, erasure_cf_descriptor,
#[cfg(feature = "erasure")]
erasure_meta_cf_descriptor, erasure_meta_cf_descriptor,
orphans_cf_descriptor, orphans_cf_descriptor,
]; ];
@ -60,10 +64,13 @@ impl Backend for Rocks {
} }
fn columns(&self) -> Vec<&'static str> { fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; #[cfg(feature = "erasure")]
use crate::blocktree::db::columns::ErasureMeta;
use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
vec![ vec![
Coding::NAME, Coding::NAME,
#[cfg(feature = "erasure")]
ErasureMeta::NAME, ErasureMeta::NAME,
Data::NAME, Data::NAME,
Orphans::NAME, Orphans::NAME,
@ -189,6 +196,7 @@ impl TypedColumn<Rocks> for cf::SlotMeta {
type Type = super::SlotMeta; type Type = super::SlotMeta;
} }
#[cfg(feature = "erasure")]
impl Column<Rocks> for cf::ErasureMeta { impl Column<Rocks> for cf::ErasureMeta {
const NAME: &'static str = super::ERASURE_META_CF; const NAME: &'static str = super::ERASURE_META_CF;
type Index = (u64, u64); type Index = (u64, u64);
@ -208,6 +216,7 @@ impl Column<Rocks> for cf::ErasureMeta {
} }
} }
#[cfg(feature = "erasure")]
impl TypedColumn<Rocks> for cf::ErasureMeta { impl TypedColumn<Rocks> for cf::ErasureMeta {
type Type = super::ErasureMeta; type Type = super::ErasureMeta;
} }

View File

@ -3,6 +3,7 @@
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
use crate::entry::{EntrySender, EntrySlice}; use crate::entry::{EntrySender, EntrySlice};
#[cfg(feature = "erasure")]
use crate::erasure::CodingGenerator; use crate::erasure::CodingGenerator;
use crate::packet::index_blobs; use crate::packet::index_blobs;
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntries;
@ -28,6 +29,8 @@ pub enum BroadcastStageReturnType {
struct Broadcast { struct Broadcast {
id: Pubkey, id: Pubkey,
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator, coding_generator: CodingGenerator,
} }
@ -116,6 +119,7 @@ impl Broadcast {
blocktree.write_shared_blobs(&blobs)?; blocktree.write_shared_blobs(&blobs)?;
#[cfg(feature = "erasure")]
let coding = self.coding_generator.next(&blobs); let coding = self.coding_generator.next(&blobs);
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
@ -125,10 +129,14 @@ impl Broadcast {
// Send out data // Send out data
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
#[cfg(feature = "erasure")]
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
// send out erasures // generate and transmit any erasure coding blobs. if erasure isn't supported, just send everything again
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; #[cfg(not(feature = "erasure"))]
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
@ -186,11 +194,11 @@ impl BroadcastStage {
storage_entry_sender: EntrySender, storage_entry_sender: EntrySender,
) -> BroadcastStageReturnType { ) -> BroadcastStageReturnType {
let me = cluster_info.read().unwrap().my_data().clone(); let me = cluster_info.read().unwrap().my_data().clone();
let coding_generator = CodingGenerator::default();
let mut broadcast = Broadcast { let mut broadcast = Broadcast {
id: me.id, id: me.id,
coding_generator, #[cfg(feature = "erasure")]
coding_generator: CodingGenerator::new(),
}; };
loop { loop {
@ -276,9 +284,9 @@ mod test {
use crate::entry::create_ticks; use crate::entry::create_ticks;
use crate::service::Service; use crate::service::Service;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -313,9 +321,7 @@ mod test {
let exit_sender = Arc::new(AtomicBool::new(false)); let exit_sender = Arc::new(AtomicBool::new(false));
let (storage_sender, _receiver) = channel(); let (storage_sender, _receiver) = channel();
let bank = Arc::new(Bank::default());
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
// Start up the broadcast stage // Start up the broadcast stage
let broadcast_service = BroadcastStage::new( let broadcast_service = BroadcastStage::new(
@ -335,13 +341,15 @@ mod test {
} }
#[test] #[test]
#[ignore]
//TODO this test won't work since broadcast stage no longer edits the ledger
fn test_broadcast_ledger() { fn test_broadcast_ledger() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path("test_broadcast_ledger"); let ledger_path = get_tmp_ledger_path("test_broadcast_ledger");
{ {
// Create the leader scheduler // Create the leader scheduler
let leader_keypair = Keypair::new(); let leader_keypair = Keypair::new();
let start_tick_height = 0;
let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT;
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let broadcast_service = setup_dummy_broadcast_service( let broadcast_service = setup_dummy_broadcast_service(
@ -350,9 +358,6 @@ mod test {
entry_receiver, entry_receiver,
); );
let bank = broadcast_service.bank.clone(); let bank = broadcast_service.bank.clone();
let start_tick_height = bank.tick_height();
let max_tick_height = bank.max_tick_height();
let ticks_per_slot = bank.ticks_per_slot();
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() { for (i, tick) in ticks.into_iter().enumerate() {
@ -362,23 +367,15 @@ mod test {
} }
sleep(Duration::from_millis(2000)); sleep(Duration::from_millis(2000));
trace!(
"[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
max_tick_height,
start_tick_height,
ticks_per_slot,
);
let blocktree = broadcast_service.blocktree; let blocktree = broadcast_service.blocktree;
let mut blob_index = 0; let mut blob_index = 0;
for i in 0..max_tick_height - start_tick_height { for i in 0..max_tick_height - start_tick_height {
let slot = (start_tick_height + i + 1) / ticks_per_slot; let slot = (start_tick_height + i + 1) / DEFAULT_TICKS_PER_SLOT;
let result = blocktree.get_data_blob(slot, blob_index).unwrap(); let result = blocktree.get_data_blob(slot, blob_index).unwrap();
blob_index += 1; blob_index += 1;
result.expect("expect blob presence"); assert!(result.is_some());
} }
drop(entry_sender); drop(entry_sender);

View File

@ -1,217 +1,278 @@
//! # Erasure Coding and Recovery // Support erasure coding
//! use crate::packet::{Blob, SharedBlob};
//! Blobs are logically grouped into erasure sets or blocks. Each set contains 16 sequential data use crate::result::{Error, Result};
//! blobs and 4 sequential coding blobs.
//!
//! Coding blobs in each set starting from `start_idx`:
//! For each erasure set:
//! generate `NUM_CODING` coding_blobs.
//! index the coding blobs from `start_idx` to `start_idx + NUM_CODING - 1`.
//!
//! model of an erasure set, with top row being data blobs and second being coding
//! |<======================= NUM_DATA ==============================>|
//! |<==== NUM_CODING ===>|
//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
//! | D | | D | | D | | D | | D | | D | | D | | D | | D | | D |
//! +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+
//! | C | | C | | C | | C | | | | | | | | | | | | |
//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
//!
//! blob structure for coding blobs
//!
//! + ------- meta is set and used by transport, meta.size is actual length
//! | of data in the byte array blob.data
//! |
//! | + -- data is stuff shipped over the wire, and has an included
//! | | header
//! V V
//! +----------+------------------------------------------------------------+
//! | meta | data |
//! |+---+-- |+---+---+---+---+------------------------------------------+|
//! || s | . || i | | f | s | ||
//! || i | . || n | i | l | i | ||
//! || z | . || d | d | a | z | blob.data(), or blob.data_mut() ||
//! || e | || e | | g | e | ||
//! |+---+-- || x | | s | | ||
//! | |+---+---+---+---+------------------------------------------+|
//! +----------+------------------------------------------------------------+
//! | |<=== coding blob part for "coding" =======>|
//! | |
//! |<============== data blob part for "coding" ==============>|
//!
//!
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::Result;
use std::cmp; use std::cmp;
use std::convert::AsMut;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use reed_solomon_erasure::ReedSolomon;
//TODO(sakridge) pick these values //TODO(sakridge) pick these values
/// Number of data blobs pub const NUM_DATA: usize = 16; // number of data blobs
pub const NUM_DATA: usize = 16; pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing
/// Number of coding blobs; also the maximum number that can go missing. pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs
pub const NUM_CODING: usize = 4;
/// Total number of blobs in an erasure set; includes data and coding blobs
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
/// Represents an erasure "session" with a particular configuration and number of data and coding macro_rules! align {
/// blobs ($x:expr, $align:expr) => {
#[derive(Debug, Clone)] $x + ($align - 1) & !($align - 1)
pub struct Session(ReedSolomon); };
/// Generates coding blobs on demand given data blobs
#[derive(Debug, Clone)]
pub struct CodingGenerator {
/// SharedBlobs that couldn't be used in last call to next()
leftover: Vec<SharedBlob>,
session: Arc<Session>,
} }
impl Session { #[derive(Debug, PartialEq, Eq)]
pub fn new(data_count: usize, coding_count: usize) -> Result<Session> { pub enum ErasureError {
let rs = ReedSolomon::new(data_count, coding_count)?; NotEnoughBlocksToDecode,
DecodeError,
EncodeError,
InvalidBlockSize,
InvalidBlobData,
CorruptCoding,
}
Ok(Session(rs)) // k = number of data devices
// m = number of coding devices
// w = word size
extern "C" {
fn jerasure_matrix_encode(
k: i32,
m: i32,
w: i32,
matrix: *const i32,
data_ptrs: *const *const u8,
coding_ptrs: *const *mut u8,
size: i32,
);
fn jerasure_matrix_decode(
k: i32,
m: i32,
w: i32,
matrix: *const i32,
row_k_ones: i32,
erasures: *const i32,
data_ptrs: *const *mut u8,
coding_ptrs: *const *mut u8,
size: i32,
) -> i32;
fn galois_single_divide(a: i32, b: i32, w: i32) -> i32;
fn galois_init_default_field(w: i32) -> i32;
}
use std::sync::Once;
static ERASURE_W_ONCE: Once = Once::new();
// jerasure word size of 32
fn w() -> i32 {
let w = 32;
unsafe {
ERASURE_W_ONCE.call_once(|| {
galois_init_default_field(w);
()
});
} }
w
}
/// Create coding blocks by overwriting `parity` // jerasure checks that arrays are a multiple of w()/8 in length
pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> { fn wb() -> usize {
self.0.encode_sep(data, parity)?; (w() / 8) as usize
}
Ok(()) fn get_matrix(m: i32, k: i32, w: i32) -> Vec<i32> {
} let mut matrix = vec![0; (m * k) as usize];
for i in 0..m {
/// Recover data + coding blocks into data blocks for j in 0..k {
/// # Arguments unsafe {
/// * `data` - array of data blocks to recover into matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w);
/// * `coding` - array of coding blocks
/// * `erasures` - list of indices in data where blocks should be recovered
pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> {
self.0.reconstruct(blocks, present)?;
Ok(())
}
/// Returns `(number_of_data_blobs, number_of_coding_blobs)`
pub fn dimensions(&self) -> (usize, usize) {
(self.0.data_shard_count(), self.0.parity_shard_count())
}
/// Reconstruct any missing blobs in this erasure set if possible
/// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata
/// Assumes that the user has sliced into the blobs appropriately already. else recovery will
/// return an error or garbage data
pub fn reconstruct_blobs<B>(
&self,
blobs: &mut [B],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)>
where
B: AsMut<[u8]>,
{
let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect();
trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,);
// Decode the blocks
self.decode_blocks(blocks.as_mut_slice(), &present)?;
let mut recovered_data = vec![];
let mut recovered_coding = vec![];
let erasures = present
.iter()
.enumerate()
.filter_map(|(i, present)| if *present { None } else { Some(i) });
// Create the missing blobs from the reconstructed data
for n in erasures {
let data_size;
let idx;
let first_byte;
if n < NUM_DATA {
let mut blob = Blob::new(&blocks[n]);
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
idx = n as u64 + block_start_idx;
first_byte = blob.data[0];
blob.set_size(data_size);
recovered_data.push(blob);
} else {
let mut blob = Blob::default();
blob.data_mut()[..size].copy_from_slice(&blocks[n]);
data_size = size;
idx = (n as u64 + block_start_idx) - NUM_DATA as u64;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
recovered_coding.push(blob);
} }
trace!(
"[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
n,
idx,
data_size,
first_byte
);
} }
}
matrix
}
Ok((recovered_data, recovered_coding)) // Generate coding blocks into coding
// There are some alignment restrictions, blocks should be aligned by 16 bytes
// which means their size should be >= 16 bytes
fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> {
if data.is_empty() {
return Ok(());
}
let k = data.len() as i32;
let m = coding.len() as i32;
let block_len = data[0].len() as i32;
let matrix: Vec<i32> = get_matrix(m, k, w());
let mut data_arg = Vec::with_capacity(data.len());
for block in data {
if block_len != block.len() as i32 {
error!(
"data block size incorrect {} expected {}",
block.len(),
block_len
);
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
}
data_arg.push(block.as_ptr());
}
let mut coding_arg = Vec::with_capacity(coding.len());
for block in coding {
if block_len != block.len() as i32 {
error!(
"coding block size incorrect {} expected {}",
block.len(),
block_len
);
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
}
coding_arg.push(block.as_mut_ptr());
}
unsafe {
jerasure_matrix_encode(
k,
m,
w(),
matrix.as_ptr(),
data_arg.as_ptr(),
coding_arg.as_ptr(),
block_len,
);
}
Ok(())
}
// Recover data + coding blocks into data blocks
// data: array of blocks to recover into
// coding: arry of coding blocks
// erasures: list of indices in data where blocks should be recovered
pub fn decode_blocks(
data: &mut [&mut [u8]],
coding: &mut [&mut [u8]],
erasures: &[i32],
) -> Result<()> {
if data.is_empty() {
return Ok(());
}
let block_len = data[0].len();
let matrix: Vec<i32> = get_matrix(coding.len() as i32, data.len() as i32, w());
// generate coding pointers, blocks should be the same size
let mut coding_arg: Vec<*mut u8> = Vec::new();
for x in coding.iter_mut() {
if x.len() != block_len {
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
}
coding_arg.push(x.as_mut_ptr());
}
// generate data pointers, blocks should be the same size
let mut data_arg: Vec<*mut u8> = Vec::new();
for x in data.iter_mut() {
if x.len() != block_len {
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
}
data_arg.push(x.as_mut_ptr());
}
let ret = unsafe {
jerasure_matrix_decode(
data.len() as i32,
coding.len() as i32,
w(),
matrix.as_ptr(),
0,
erasures.as_ptr(),
data_arg.as_ptr(),
coding_arg.as_ptr(),
data[0].len() as i32,
)
};
trace!("jerasure_matrix_decode ret: {}", ret);
for x in data[erasures[0] as usize][0..8].iter() {
trace!("{} ", x)
}
trace!("");
if ret < 0 {
return Err(Error::ErasureError(ErasureError::DecodeError));
}
Ok(())
}
// Generate coding blocks in window starting from start_idx,
// for num_blobs.. For each block place the coding blobs
// at the start of the block like so:
//
// model of an erasure set, with top row being data blobs and second being coding
// |<======================= NUM_DATA ==============================>|
// |<==== NUM_CODING ===>|
// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
// | D | | D | | D | | D | | D | | D | | D | | D | | D | | D |
// +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+
// | C | | C | | C | | C | | | | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
//
// blob structure for coding, recover
//
// + ------- meta is set and used by transport, meta.size is actual length
// | of data in the byte array blob.data
// |
// | + -- data is stuff shipped over the wire, and has an included
// | | header
// V V
// +----------+------------------------------------------------------------+
// | meta | data |
// |+---+-- |+---+---+---+---+------------------------------------------+|
// || s | . || i | | f | s | ||
// || i | . || n | i | l | i | ||
// || z | . || d | d | a | z | blob.data(), or blob.data_mut() ||
// || e | || e | | g | e | ||
// |+---+-- || x | | s | | ||
// | |+---+---+---+---+------------------------------------------+|
// +----------+------------------------------------------------------------+
// | |<=== coding blob part for "coding" =======>|
// | |
// |<============== data blob part for "coding" ==============>|
//
//
//
pub struct CodingGenerator {
leftover: Vec<SharedBlob>, // SharedBlobs that couldn't be used in last call to next()
}
impl Default for CodingGenerator {
fn default() -> Self {
CodingGenerator {
leftover: Vec::with_capacity(NUM_DATA),
}
} }
} }
impl CodingGenerator { impl CodingGenerator {
pub fn new(session: Arc<Session>) -> Self { pub fn new() -> Self {
CodingGenerator { Self::default()
leftover: Vec::with_capacity(session.0.data_shard_count()),
session,
}
} }
/// Yields next set of coding blobs, if any. // must be called with consecutive data blobs from previous invocation
/// Must be called with consecutive data blobs within a slot. // blobs from a new slot not start halfway through next_data
///
/// Passing in a slice with the first blob having a new slot will cause internal state to
/// reset, so the above concern does not apply to slot boundaries, only indexes within a slot
/// must be consecutive.
///
/// If used improperly, it my return garbage coding blobs, but will not give an
/// error.
pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> { pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
let (num_data, num_coding) = self.session.dimensions();
let mut next_coding = let mut next_coding =
Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding); Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
if !self.leftover.is_empty() if self.leftover.len() > 0 && next_data.len() > 0 {
&& !next_data.is_empty() if self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() {
&& self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() self.leftover.clear(); // reset on slot boundaries
{ }
self.leftover.clear();
} }
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect(); let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
for data_blobs in next_data.chunks(num_data) { for data_blobs in next_data.chunks(NUM_DATA) {
if data_blobs.len() < num_data { if data_blobs.len() < NUM_DATA {
self.leftover = data_blobs.to_vec(); self.leftover = data_blobs.to_vec();
break; break;
} }
self.leftover.clear(); self.leftover.clear();
// find max_data_size for the erasure set // find max_data_size for the chunk, round length up to a multiple of wb()
let max_data_size = data_blobs let max_data_size = align!(
.iter() data_blobs
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)); .iter()
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)),
wb()
);
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks let data_ptrs: Vec<_> = data_locks
@ -219,9 +280,9 @@ impl CodingGenerator {
.map(|l| &l.data[..max_data_size]) .map(|l| &l.data[..max_data_size])
.collect(); .collect();
let mut coding_blobs = Vec::with_capacity(num_coding); let mut coding_blobs = Vec::with_capacity(NUM_CODING);
for data_blob in &data_locks[..num_coding] { for data_blob in &data_locks[..NUM_CODING] {
let index = data_blob.index(); let index = data_blob.index();
let slot = data_blob.slot(); let slot = data_blob.slot();
let id = data_blob.id(); let id = data_blob.id();
@ -244,7 +305,7 @@ impl CodingGenerator {
.map(|blob| &mut blob.data_mut()[..max_data_size]) .map(|blob| &mut blob.data_mut()[..max_data_size])
.collect(); .collect();
self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice()) generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)
} }
.is_ok() .is_ok()
{ {
@ -259,22 +320,6 @@ impl CodingGenerator {
} }
} }
impl Default for Session {
fn default() -> Session {
Session::new(NUM_DATA, NUM_CODING).unwrap()
}
}
impl Default for CodingGenerator {
fn default() -> Self {
let session = Session::default();
CodingGenerator {
leftover: Vec::with_capacity(session.0.data_shard_count()),
session: Arc::new(session),
}
}
}
#[cfg(test)] #[cfg(test)]
pub mod test { pub mod test {
use super::*; use super::*;
@ -323,63 +368,63 @@ pub mod test {
#[test] #[test]
fn test_coding() { fn test_coding() {
const N_DATA: usize = 4; let zero_vec = vec![0; 16];
const N_CODING: usize = 2; let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
let session = Session::new(N_DATA, N_CODING).unwrap();
let mut vs: Vec<Vec<u8>> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect();
let v_orig: Vec<u8> = vs[0].clone(); let v_orig: Vec<u8> = vs[0].clone();
let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect(); let m = 2;
let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect();
let mut coding_blocks_slices: Vec<_> = {
coding_blocks.iter_mut().map(Vec::as_mut_slice).collect(); let mut coding_blocks_slices: Vec<_> =
let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect(); coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
session
.encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice())
.expect("encoding must succeed");
assert!(generate_coding_blocks(
coding_blocks_slices.as_mut_slice(),
v_slices.as_slice(),
)
.is_ok());
}
trace!("test_coding: coding blocks:"); trace!("test_coding: coding blocks:");
for b in &coding_blocks { for b in &coding_blocks {
trace!("test_coding: {:?}", b); trace!("test_coding: {:?}", b);
} }
let erasure: i32 = 1;
let erasure: usize = 1; let erasures = vec![erasure, -1];
let present = &mut [true; N_DATA + N_CODING];
present[erasure] = false;
let erased = vs[erasure].clone();
// clear an entry // clear an entry
vs[erasure as usize].copy_from_slice(&[0; 16]); vs[erasure as usize].copy_from_slice(zero_vec.as_slice());
let mut blocks: Vec<_> = vs {
.iter_mut() let mut coding_blocks_slices: Vec<_> =
.chain(coding_blocks.iter_mut()) coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
.map(Vec::as_mut_slice) let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
.collect();
session assert!(decode_blocks(
.decode_blocks(blocks.as_mut_slice(), present) v_slices.as_mut_slice(),
.expect("decoding must succeed"); coding_blocks_slices.as_mut_slice(),
erasures.as_slice(),
)
.is_ok());
}
trace!("test_coding: vs:"); trace!("test_coding: vs:");
for v in &vs { for v in &vs {
trace!("test_coding: {:?}", v); trace!("test_coding: {:?}", v);
} }
assert_eq!(v_orig, vs[0]); assert_eq!(v_orig, vs[0]);
assert_eq!(erased, vs[erasure]);
} }
fn test_toss_and_recover( fn test_toss_and_recover(
session: &Session,
data_blobs: &[SharedBlob], data_blobs: &[SharedBlob],
coding_blobs: &[SharedBlob], coding_blobs: &[SharedBlob],
block_start_idx: usize, block_start_idx: usize,
) { ) {
let size = coding_blobs[0].read().unwrap().size(); let size = coding_blobs[0].read().unwrap().size();
// toss one data and one coding
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE); let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default()); // empty data, erasure at zero blobs.push(SharedBlob::default()); // empty data, erasure at zero
@ -387,23 +432,14 @@ pub mod test {
// skip first blob // skip first blob
blobs.push(blob.clone()); blobs.push(blob.clone());
} }
blobs.push(SharedBlob::default()); // empty coding, erasure at zero blobs.push(SharedBlob::default()); // empty coding, erasure at zero
for blob in &coding_blobs[1..NUM_CODING] { for blob in &coding_blobs[1..NUM_CODING] {
blobs.push(blob.clone()); blobs.push(blob.clone());
} }
// toss one data and one coding let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
let mut present = vec![true; blobs.len()];
present[0] = false;
present[NUM_DATA] = false;
let (recovered_data, recovered_coding) = session assert!(!corrupt);
.reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0)
.expect("reconstruction must succeed");
assert_eq!(recovered_data.len(), 1);
assert_eq!(recovered_coding.len(), 1);
assert_eq!( assert_eq!(
blobs[1].read().unwrap().meta, blobs[1].read().unwrap().meta,
@ -414,15 +450,15 @@ pub mod test {
data_blobs[block_start_idx + 1].read().unwrap().data() data_blobs[block_start_idx + 1].read().unwrap().data()
); );
assert_eq!( assert_eq!(
recovered_data[0].meta, blobs[0].read().unwrap().meta,
data_blobs[block_start_idx].read().unwrap().meta data_blobs[block_start_idx].read().unwrap().meta
); );
assert_eq!( assert_eq!(
recovered_data[0].data(), blobs[0].read().unwrap().data(),
data_blobs[block_start_idx].read().unwrap().data() data_blobs[block_start_idx].read().unwrap().data()
); );
assert_eq!( assert_eq!(
recovered_coding[0].data(), blobs[NUM_DATA].read().unwrap().data(),
coding_blobs[0].read().unwrap().data() coding_blobs[0].read().unwrap().data()
); );
} }
@ -432,11 +468,11 @@ pub mod test {
solana_logger::setup(); solana_logger::setup();
// trivial case // trivial case
let mut coding_generator = CodingGenerator::default(); let mut coding_generator = CodingGenerator::new();
let blobs = Vec::new(); let blobs = Vec::new();
for _ in 0..NUM_DATA * 2 { for _ in 0..NUM_DATA * 2 {
let coding = coding_generator.next(&blobs); let coding = coding_generator.next(&blobs);
assert!(coding.is_empty()); assert_eq!(coding.len(), 0);
} }
// test coding by iterating one blob at a time // test coding by iterating one blob at a time
@ -444,7 +480,6 @@ pub mod test {
for (i, blob) in data_blobs.iter().cloned().enumerate() { for (i, blob) in data_blobs.iter().cloned().enumerate() {
let coding_blobs = coding_generator.next(&[blob]); let coding_blobs = coding_generator.next(&[blob]);
if !coding_blobs.is_empty() { if !coding_blobs.is_empty() {
assert_eq!(i % NUM_DATA, NUM_DATA - 1); assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding_blobs.len(), NUM_CODING); assert_eq!(coding_blobs.len(), NUM_CODING);
@ -455,12 +490,7 @@ pub mod test {
((i / NUM_DATA) * NUM_DATA + j) as u64 ((i / NUM_DATA) * NUM_DATA + j) as u64
); );
} }
test_toss_and_recover( test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA));
&coding_generator.session,
&data_blobs,
&coding_blobs,
i - (i % NUM_DATA),
);
} }
} }
} }
@ -469,7 +499,7 @@ pub mod test {
fn test_erasure_generate_coding_reset_on_new_slot() { fn test_erasure_generate_coding_reset_on_new_slot() {
solana_logger::setup(); solana_logger::setup();
let mut coding_generator = CodingGenerator::default(); let mut coding_generator = CodingGenerator::new();
// test coding by iterating one blob at a time // test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2); let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
@ -479,18 +509,13 @@ pub mod test {
} }
let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]); let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
assert!(coding_blobs.is_empty()); assert_eq!(coding_blobs.len(), 0);
let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]); let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
assert_eq!(coding_blobs.len(), NUM_CODING); assert_eq!(coding_blobs.len(), NUM_CODING);
test_toss_and_recover( test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA);
&coding_generator.session,
&data_blobs,
&coding_blobs,
NUM_DATA,
);
} }
#[test] #[test]
@ -546,17 +571,24 @@ pub mod test {
} }
} }
/// This test is ignored because if successful, it never stops running. It is useful for
/// dicovering an initialization race-condition in the erasure FFI bindings. If this bug
/// re-emerges, running with `Z_THREADS = N` where `N > 1` should crash fairly rapidly.
#[ignore]
#[test] #[test]
fn test_recovery_with_model() { fn test_recovery_with_model() {
use std::env;
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
const MAX_ERASURE_SETS: u64 = 16; const MAX_ERASURE_SETS: u64 = 16;
const N_THREADS: usize = 2;
const N_SLOTS: u64 = 10;
solana_logger::setup(); solana_logger::setup();
let n_threads: usize = env::var("Z_THREADS")
.unwrap_or("1".to_string())
.parse()
.unwrap();
let specs = (0..N_SLOTS).map(|slot| { let specs = (0..).map(|slot| {
let num_erasure_sets = slot % MAX_ERASURE_SETS; let num_erasure_sets = slot % MAX_ERASURE_SETS;
let set_specs = (0..num_erasure_sets) let set_specs = (0..num_erasure_sets)
@ -570,12 +602,12 @@ pub mod test {
SlotSpec { slot, set_specs } SlotSpec { slot, set_specs }
}); });
let decode_mutex = Arc::new(Mutex::new(()));
let mut handles = vec![]; let mut handles = vec![];
let session = Arc::new(Session::default());
for i in 0..N_THREADS { for i in 0..n_threads {
let specs = specs.clone(); let specs = specs.clone();
let session = Arc::clone(&session); let decode_mutex = Arc::clone(&decode_mutex);
let handle = thread::Builder::new() let handle = thread::Builder::new()
.name(i.to_string()) .name(i.to_string())
@ -585,39 +617,55 @@ pub mod test {
let erased_coding = erasure_set.coding[0].clone(); let erased_coding = erasure_set.coding[0].clone();
let erased_data = erasure_set.data[..3].to_vec(); let erased_data = erasure_set.data[..3].to_vec();
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); let mut data = Vec::with_capacity(NUM_DATA);
let mut coding = Vec::with_capacity(NUM_CODING);
let erasures = vec![0, 1, 2, NUM_DATA as i32, -1];
blobs.push(SharedBlob::default()); data.push(SharedBlob::default());
blobs.push(SharedBlob::default()); data.push(SharedBlob::default());
blobs.push(SharedBlob::default()); data.push(SharedBlob::default());
for blob in erasure_set.data.into_iter().skip(3) { for blob in erasure_set.data.into_iter().skip(3) {
blobs.push(blob); data.push(blob);
} }
blobs.push(SharedBlob::default()); coding.push(SharedBlob::default());
for blob in erasure_set.coding.into_iter().skip(1) { for blob in erasure_set.coding.into_iter().skip(1) {
blobs.push(blob); coding.push(blob);
} }
let size = erased_coding.read().unwrap().data_size() as usize; let size = erased_coding.read().unwrap().data_size() as usize;
let mut present = vec![true; ERASURE_SET_SIZE]; let mut data_locks: Vec<_> =
present[0] = false; data.iter().map(|shared| shared.write().unwrap()).collect();
present[1] = false; let mut coding_locks: Vec<_> = coding
present[2] = false; .iter()
present[NUM_DATA] = false; .map(|shared| shared.write().unwrap())
.collect();
session let mut data_ptrs: Vec<_> = data_locks
.reconstruct_shared_blobs( .iter_mut()
&mut blobs, .map(|blob| &mut blob.data[..size])
&present, .collect();
size, let mut coding_ptrs: Vec<_> = coding_locks
erasure_set.set_index * NUM_DATA as u64, .iter_mut()
slot_model.slot, .map(|blob| &mut blob.data_mut()[..size])
.collect();
{
let _lock = decode_mutex.lock();
decode_blocks(
data_ptrs.as_mut_slice(),
coding_ptrs.as_mut_slice(),
&erasures,
) )
.expect("reconstruction must succeed"); .expect("decoding must succeed");
}
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) { drop(coding_locks);
drop(data_locks);
for (expected, recovered) in erased_data.iter().zip(data.iter()) {
let expected = expected.read().unwrap(); let expected = expected.read().unwrap();
let mut recovered = recovered.write().unwrap(); let mut recovered = recovered.write().unwrap();
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE; let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
@ -629,7 +677,7 @@ pub mod test {
assert_eq!( assert_eq!(
erased_coding.read().unwrap().data(), erased_coding.read().unwrap().data(),
blobs[NUM_DATA].read().unwrap().data() coding[0].read().unwrap().data()
); );
debug!("passed set: {}", erasure_set.set_index); debug!("passed set: {}", erasure_set.set_index);
@ -654,9 +702,7 @@ pub mod test {
IntoIt: Iterator<Item = S> + Clone + 'a, IntoIt: Iterator<Item = S> + Clone + 'a,
S: Borrow<SlotSpec>, S: Borrow<SlotSpec>,
{ {
let mut coding_generator = CodingGenerator::default(); specs.into_iter().map(|spec| {
specs.into_iter().map(move |spec| {
let spec = spec.borrow(); let spec = spec.borrow();
let slot = spec.slot; let slot = spec.slot;
@ -676,6 +722,7 @@ pub mod test {
0, 0,
); );
let mut coding_generator = CodingGenerator::new();
let mut coding_blobs = coding_generator.next(&blobs); let mut coding_blobs = coding_generator.next(&blobs);
blobs.drain(erasure_spec.num_data..); blobs.drain(erasure_spec.num_data..);
@ -730,33 +777,77 @@ pub mod test {
blobs blobs
} }
impl Session { fn decode_blobs(
fn reconstruct_shared_blobs( blobs: &[SharedBlob],
&self, erasures: &[i32],
blobs: &mut [SharedBlob], size: usize,
present: &[bool], block_start_idx: u64,
size: usize, slot: u64,
block_start_idx: u64, ) -> Result<bool> {
slot: u64, let mut locks = Vec::with_capacity(ERASURE_SET_SIZE);
) -> Result<(Vec<Blob>, Vec<Blob>)> { let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
let mut locks: Vec<std::sync::RwLockWriteGuard<_>> = blobs let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect();
let mut slices: Vec<_> = locks assert_eq!(blobs.len(), ERASURE_SET_SIZE);
.iter_mut() for b in blobs {
.enumerate() locks.push(b.write().unwrap());
.map(|(i, blob)| {
if i < NUM_DATA {
&mut blob.data[..size]
} else {
&mut blob.data_mut()[..size]
}
})
.collect();
self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot)
} }
for (i, l) in locks.iter_mut().enumerate() {
if i < NUM_DATA {
data_ptrs.push(&mut l.data[..size]);
} else {
coding_ptrs.push(&mut l.data_mut()[..size]);
}
}
// Decode the blocks
decode_blocks(
data_ptrs.as_mut_slice(),
coding_ptrs.as_mut_slice(),
&erasures,
)?;
// Create the missing blobs from the reconstructed data
let mut corrupt = false;
for i in &erasures[..erasures.len() - 1] {
let n = *i as usize;
let mut idx = n as u64 + block_start_idx;
let mut data_size;
if n < NUM_DATA {
data_size = locks[n].data_size() as usize;
data_size -= BLOB_HEADER_SIZE;
if data_size > BLOB_DATA_SIZE {
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
corrupt = true;
break;
}
} else {
data_size = size;
idx -= NUM_DATA as u64;
locks[n].set_slot(slot);
locks[n].set_index(idx);
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
corrupt = true;
break;
}
}
locks[n].set_size(data_size);
trace!(
"erasures[{}] ({}) size: {} data[0]: {}",
*i,
idx,
data_size,
locks[n].data()[0]
);
}
Ok(corrupt)
} }
} }

View File

@ -31,6 +31,7 @@ pub mod cluster;
pub mod cluster_info; pub mod cluster_info;
pub mod cluster_tests; pub mod cluster_tests;
pub mod entry; pub mod entry;
#[cfg(feature = "erasure")]
pub mod erasure; pub mod erasure;
pub mod fetch_stage; pub mod fetch_stage;
pub mod fullnode; pub mod fullnode;

View File

@ -2,6 +2,8 @@
use crate::blocktree; use crate::blocktree;
use crate::cluster_info; use crate::cluster_info;
#[cfg(feature = "erasure")]
use crate::erasure;
use crate::packet; use crate::packet;
use crate::poh_recorder; use crate::poh_recorder;
use bincode; use bincode;
@ -23,7 +25,8 @@ pub enum Error {
TransactionError(transaction::TransactionError), TransactionError(transaction::TransactionError),
ClusterInfoError(cluster_info::ClusterInfoError), ClusterInfoError(cluster_info::ClusterInfoError),
BlobError(packet::BlobError), BlobError(packet::BlobError),
ErasureError(reed_solomon_erasure::Error), #[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError),
SendError, SendError,
PohRecorderError(poh_recorder::PohRecorderError), PohRecorderError(poh_recorder::PohRecorderError),
BlocktreeError(blocktree::BlocktreeError), BlocktreeError(blocktree::BlocktreeError),
@ -64,8 +67,9 @@ impl std::convert::From<cluster_info::ClusterInfoError> for Error {
Error::ClusterInfoError(e) Error::ClusterInfoError(e)
} }
} }
impl std::convert::From<reed_solomon_erasure::Error> for Error { #[cfg(feature = "erasure")]
fn from(e: reed_solomon_erasure::Error) -> Error { impl std::convert::From<erasure::ErasureError> for Error {
fn from(e: erasure::ErasureError) -> Error {
Error::ErasureError(e) Error::ErasureError(e)
} }
} }