From 6eac5951ed65cb31636e829becb4cc69337716de Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Thu, 18 Apr 2019 21:56:43 -0700 Subject: [PATCH] Revert "Revert "revert-revert-erasure and erasure fixes (#3833)" (#3855)" (#3889) This reverts commit 596f611ede4018b5ccfcff533b1464c3dcbef946. --- Cargo.lock | 13 + core/Cargo.toml | 1 + core/build.rs | 29 +- core/src/blocktree.rs | 462 ++++++++++----------- core/src/blocktree/db.rs | 1 - core/src/blocktree/kvs.rs | 2 - core/src/blocktree/meta.rs | 141 +++++-- core/src/blocktree/rocks.rs | 13 +- core/src/broadcast_stage.rs | 43 +- core/src/erasure.rs | 772 ++++++++++++++++-------------------- core/src/lib.rs | 1 - core/src/packet.rs | 8 +- core/src/result.rs | 10 +- 13 files changed, 707 insertions(+), 789 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1d3d353fc..fa87bec69f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1909,6 +1909,17 @@ dependencies = [ "redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "reed-solomon-erasure" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cc 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "regex" version = "1.1.2" @@ -2203,6 +2214,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3597,6 +3609,7 @@ dependencies = [ "checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828" +"checksum reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77cbbd4c02f53e345fe49e74255a1b10080731ffb2a03475e11df7fc8a043c37" "checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f" "checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" diff --git a/core/Cargo.toml b/core/Cargo.toml index 0c002a3414..591c35a68f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -41,6 +41,7 @@ nix = "0.13.0" rand = "0.6.5" rand_chacha = "0.1.1" rayon = "1.0.0" +reed-solomon-erasure = "3.1.1" reqwest = "0.9.11" rocksdb = "0.11.0" serde = "1.0.89" diff --git a/core/build.rs b/core/build.rs index 4a38003775..7c8cfebb3b 100644 --- a/core/build.rs +++ b/core/build.rs @@ -24,9 +24,8 @@ fn main() { let chacha = !env::var("CARGO_FEATURE_CHACHA").is_err(); let cuda = !env::var("CARGO_FEATURE_CUDA").is_err(); - let erasure = !env::var("CARGO_FEATURE_ERASURE").is_err(); - if chacha || cuda || erasure { + if chacha || cuda { println!("cargo:rerun-if-changed={}", perf_libs_dir); println!("cargo:rustc-link-search=native={}", perf_libs_dir); } @@ -46,30 +45,4 @@ fn main() { println!("cargo:rustc-link-lib=dylib=cuda"); println!("cargo:rustc-link-lib=dylib=cudadevrt"); } - if erasure { - #[cfg(any(target_os = "macos", target_os = "ios"))] - { - println!( - "cargo:rerun-if-changed={}/libgf_complete.dylib", - perf_libs_dir - ); - println!("cargo:rerun-if-changed={}/libJerasure.dylib", perf_libs_dir); - } - #[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))] - { - println!("cargo:rerun-if-changed={}/libgf_complete.so", perf_libs_dir); - println!("cargo:rerun-if-changed={}/libJerasure.so", perf_libs_dir); - } - #[cfg(windows)] - { - println!( - "cargo:rerun-if-changed={}/libgf_complete.dll", - perf_libs_dir - ); - println!("cargo:rerun-if-changed={}/libJerasure.dll", perf_libs_dir); - } - - println!("cargo:rustc-link-lib=dylib=Jerasure"); - println!("cargo:rustc-link-lib=dylib=gf_complete"); - } } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 3fdfd39f24..3f341e1826 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,7 +3,6 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; -#[cfg(feature = "erasure")] use crate::erasure; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; @@ -17,7 +16,6 @@ use hashbrown::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; -#[cfg(feature = "erasure")] use solana_metrics::counter::Counter; use solana_sdk::genesis_block::GenesisBlock; @@ -79,9 +77,9 @@ pub struct Blocktree { meta_cf: LedgerColumn, data_cf: LedgerColumn, erasure_cf: LedgerColumn, - #[cfg(feature = "erasure")] erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, + session: Arc, pub new_blobs_signals: Vec>, pub root_slot: RwLock, } @@ -92,7 +90,6 @@ pub const META_CF: &str = "meta"; pub const DATA_CF: &str = "data"; // Column family for erasure data pub const ERASURE_CF: &str = "erasure"; -#[cfg(feature = "erasure")] pub const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data pub const ORPHANS_CF: &str = "orphans"; @@ -116,7 +113,7 @@ impl Blocktree { // Create the erasure column family let erasure_cf = LedgerColumn::new(&db); - #[cfg(feature = "erasure")] + let erasure_meta_cf = LedgerColumn::new(&db); // Create the orphans column family. An "orphan" is defined as @@ -124,14 +121,17 @@ impl Blocktree { // known parent let orphans_cf = LedgerColumn::new(&db); + // setup erasure + let session = Arc::new(erasure::Session::default()); + Ok(Blocktree { db, meta_cf, data_cf, erasure_cf, - #[cfg(feature = "erasure")] erasure_meta_cf, orphans_cf, + session, new_blobs_signals: vec![], root_slot: RwLock::new(0), }) @@ -259,7 +259,6 @@ impl Blocktree { // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); - #[cfg(feature = "erasure")] let mut erasure_meta_working_set = HashMap::new(); let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let mut prev_inserted_blob_datas = HashMap::new(); @@ -301,20 +300,17 @@ impl Blocktree { continue; } - #[cfg(feature = "erasure")] - { - let set_index = ErasureMeta::set_index_for(blob.index()); - let erasure_meta_entry = erasure_meta_working_set - .entry((blob_slot, set_index)) - .or_insert_with(|| { - self.erasure_meta_cf - .get((blob_slot, set_index)) - .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::new(set_index)) - }); + let set_index = ErasureMeta::set_index_for(blob.index()); + let erasure_meta_entry = erasure_meta_working_set + .entry((blob_slot, set_index)) + .or_insert_with(|| { + self.erasure_meta_cf + .get((blob_slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index)) + }); - erasure_meta_entry.set_data_present(blob.index()); - } + erasure_meta_entry.set_data_present(blob.index(), true); let _ = self.insert_data_blob( blob, @@ -339,11 +335,8 @@ impl Blocktree { } } - #[cfg(feature = "erasure")] - { - for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { - write_batch.put::((*slot, *set_index), erasure_meta)?; - } + for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { + write_batch.put::((*slot, *set_index), erasure_meta)?; } self.db.write(write_batch)?; @@ -354,36 +347,8 @@ impl Blocktree { } } - #[cfg(feature = "erasure")] for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() { - if erasure_meta.can_recover() { - match self.recover(slot, set_index) { - Ok(recovered) => { - inc_new_counter_info!("erasures-recovered", recovered); - } - Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => { - let mut erasure_meta = self - .erasure_meta_cf - .get((slot, set_index))? - .expect("erasure meta should exist"); - - let mut batch = self.db.batch()?; - - let start_index = erasure_meta.start_index(); - let (_, coding_end_idx) = erasure_meta.end_indexes(); - - erasure_meta.coding = 0; - batch.put::((slot, set_index), &erasure_meta)?; - - for idx in start_index..coding_end_idx { - batch.delete::((slot, idx))?; - } - - self.db.write(batch)?; - } - Err(e) => return Err(e), - } - } + self.try_erasure_recover(&erasure_meta, slot, set_index)?; } Ok(()) @@ -453,26 +418,42 @@ impl Blocktree { pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.erasure_cf.get_bytes((slot, index)) } + pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { - self.erasure_cf.delete((slot, index)) + let set_index = ErasureMeta::set_index_for(index); + + let mut erasure_meta = self + .erasure_meta_cf + .get((slot, set_index))? + .unwrap_or_else(|| ErasureMeta::new(set_index)); + + erasure_meta.set_coding_present(index, false); + + let mut batch = self.db.batch()?; + + batch.delete::((slot, index))?; + batch.put::((slot, set_index), &erasure_meta)?; + + self.db.write(batch)?; + Ok(()) } + pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.data_cf.get_bytes((slot, index)) } + /// For benchmarks, testing, and setup. + /// Does no metadata tracking. Use with care. + pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.data_cf.put_bytes((slot, index), bytes) + } + pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { self.erasure_cf.put_bytes((slot, index), bytes) } - #[cfg(not(feature = "erasure"))] - #[inline] - pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.put_coding_blob_bytes_raw(slot, index, bytes) - } - /// this function will insert coding blobs and also automatically track erasure-related /// metadata. If recovery is available it will be done - #[cfg(feature = "erasure")] pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { let set_index = ErasureMeta::set_index_for(index); let mut erasure_meta = self @@ -480,7 +461,7 @@ impl Blocktree { .get((slot, set_index))? .unwrap_or_else(|| ErasureMeta::new(set_index)); - erasure_meta.set_coding_present(index); + erasure_meta.set_coding_present(index, true); let mut writebatch = self.db.batch()?; @@ -490,43 +471,28 @@ impl Blocktree { self.db.write(writebatch)?; - if erasure_meta.can_recover() { - match self.recover(slot, set_index) { - Ok(recovered) => { - inc_new_counter_info!("erasures-recovered", recovered); - return Ok(()); - } - Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => { - let start_index = erasure_meta.start_index(); - let (_, coding_end_idx) = erasure_meta.end_indexes(); - let mut batch = self.db.batch()?; + self.try_erasure_recover(&erasure_meta, slot, set_index) + } - erasure_meta.coding = 0; - batch.put::((slot, set_index), &erasure_meta)?; - - for idx in start_index..coding_end_idx { - batch.delete::((slot, idx as u64))?; - } - - self.db.write(batch)?; - - return Ok(()); - } - Err(e) => return Err(e), + fn try_erasure_recover( + &self, + erasure_meta: &ErasureMeta, + slot: u64, + set_index: u64, + ) -> Result<()> { + match erasure_meta.status() { + ErasureMetaStatus::CanRecover => { + let recovered = self.recover(slot, set_index)?; + inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered); } + ErasureMetaStatus::StillNeed(needed) => { + inc_new_counter_info!("blocktree-erasure-blobs_needed", needed) + } + ErasureMetaStatus::DataFull => inc_new_counter_info!("blocktree-erasure-complete", 1), } - Ok(()) } - pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> { - self.data_cf.put_bytes((slot, index), value) - } - - pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.data_cf.put_bytes((slot, index), bytes) - } - pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result> { let bytes = self.get_data_blob_bytes(slot, blob_index)?; Ok(bytes.map(|bytes| { @@ -626,20 +592,6 @@ impl Blocktree { } } - pub fn find_missing_coding_indexes( - &self, - slot: u64, - start_index: u64, - end_index: u64, - max_missing: usize, - ) -> Vec { - if let Ok(mut db_iterator) = self.erasure_cf.cursor() { - Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) - } else { - vec![] - } - } - /// Returns the entry vector for the slot starting with `blob_start_index` pub fn get_slot_entries( &self, @@ -1088,43 +1040,45 @@ impl Blocktree { Ok(()) } - #[cfg(feature = "erasure")] /// Attempts recovery using erasure coding fn recover(&self, slot: u64, set_index: u64) -> Result { - use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA}; - use crate::packet::BLOB_DATA_SIZE; + use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA}; let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap(); let start_idx = erasure_meta.start_index(); let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); - let mut erasures = Vec::with_capacity(NUM_CODING + 1); - let (mut data, mut coding) = (vec![], vec![]); + let present = &mut [true; ERASURE_SET_SIZE]; + let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); let mut size = 0; for i in start_idx..coding_end_idx { if erasure_meta.is_coding_present(i) { - let blob_bytes = self + let mut blob_bytes = self .erasure_cf .get_bytes((slot, i))? .expect("erasure_meta must have no false positives"); + blob_bytes.drain(..BLOB_HEADER_SIZE); + if size == 0 { - size = blob_bytes.len() - BLOB_HEADER_SIZE; + size = blob_bytes.len(); } - coding.push(blob_bytes); + blobs.push(blob_bytes); } else { - let set_relative_idx = (i - start_idx) + NUM_DATA as u64; - coding.push(vec![0; crate::packet::BLOB_SIZE]); - erasures.push(set_relative_idx as i32); + let set_relative_idx = (i - start_idx) as usize + NUM_DATA; + blobs.push(vec![0; size]); + present[set_relative_idx] = false; } } assert_ne!(size, 0); for i in start_idx..data_end_idx { + let set_relative_idx = (i - start_idx) as usize; + if erasure_meta.is_data_present(i) { let mut blob_bytes = self .data_cf @@ -1132,90 +1086,28 @@ impl Blocktree { .expect("erasure_meta must have no false positives"); // If data is too short, extend it with zeroes - if blob_bytes.len() < size { - blob_bytes.resize(size, 0u8); - } + blob_bytes.resize(size, 0u8); - data.push(blob_bytes); + blobs.insert(set_relative_idx, blob_bytes); } else { - let set_relative_index = i - start_idx; - data.push(vec![0; size]); + blobs.insert(set_relative_idx, vec![0u8; size]); // data erasures must come before any coding erasures if present - erasures.insert(0, set_relative_index as i32); + present[set_relative_idx] = false; } } - let mut coding_ptrs: Vec<_> = coding - .iter_mut() - .map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size]) - .collect(); + let (recovered_data, recovered_coding) = self + .session + .reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; - let mut data_ptrs: Vec<_> = data - .iter_mut() - .map(|data_bytes| &mut data_bytes[..size]) - .collect(); + let amount_recovered = recovered_data.len() + recovered_coding.len(); - // Marks the end - erasures.push(-1); - trace!("erasures: {:?}, size: {}", erasures, size); - - erasure::decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, - )?; - - // Create the missing blobs from the reconstructed data - let block_start_idx = erasure_meta.start_index(); - let (mut recovered_data, mut recovered_coding) = (vec![], vec![]); - - for i in &erasures[..erasures.len() - 1] { - let n = *i as usize; - - let (data_size, idx, first_byte); - - if n < NUM_DATA { - let mut blob = Blob::new(&data_ptrs[n]); - - idx = n as u64 + block_start_idx; - data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; - first_byte = blob.data[0]; - - if data_size > BLOB_DATA_SIZE { - error!("corrupt data blob[{}] data_size: {}", idx, data_size); - return Err(Error::ErasureError(ErasureError::CorruptCoding)); - } - - blob.set_slot(slot); - blob.set_index(idx); - blob.set_size(data_size); - recovered_data.push(blob); - } else { - let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]); - - idx = (n - NUM_DATA) as u64 + block_start_idx; - data_size = size; - first_byte = blob.data[0]; - - if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { - error!("corrupt coding blob[{}] data_size: {}", idx, data_size); - return Err(Error::ErasureError(ErasureError::CorruptCoding)); - } - - blob.set_slot(slot); - blob.set_index(idx); - blob.set_data_size(data_size as u64); - recovered_coding.push(blob); - } - - trace!( - "erasures[{}] ({}) size: {} data[0]: {}", - *i, - idx, - data_size, - first_byte, - ); - } + trace!( + "[recover] reconstruction OK slot: {}, indexes: [{},{})", + slot, + start_idx, + data_end_idx + ); self.write_blobs(recovered_data)?; @@ -1223,7 +1115,7 @@ impl Blocktree { self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?; } - Ok(erasures.len() - 1) + Ok(amount_recovered) } /// Returns the next consumed index and the number of ticks in the new consumed @@ -1821,44 +1713,47 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive"); { let blocktree = Blocktree::open(&blocktree_path).unwrap(); - let slot = 0; - let parent_slot = 0; - // Write entries - let num_entries = 21 as u64; - let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries); + for i in 0..4 { + let slot = i; + let parent_slot = if i == 0 { 0 } else { i - 1 }; + // Write entries + let num_entries = 21 as u64 * (i + 1); + let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries); - blocktree - .write_blobs(blobs.iter().skip(1).step_by(2)) - .unwrap(); + blocktree + .write_blobs(blobs.iter().skip(1).step_by(2)) + .unwrap(); - assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); + assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); - let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); - if num_entries % 2 == 0 { + let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); + if num_entries % 2 == 0 { + assert_eq!(meta.received, num_entries); + } else { + debug!("got here"); + assert_eq!(meta.received, num_entries - 1); + } + assert_eq!(meta.consumed, 0); + assert_eq!(meta.parent_slot, parent_slot); + if num_entries % 2 == 0 { + assert_eq!(meta.last_index, num_entries - 1); + } else { + assert_eq!(meta.last_index, std::u64::MAX); + } + + blocktree.write_blobs(blobs.iter().step_by(2)).unwrap(); + + assert_eq!( + blocktree.get_slot_entries(slot, 0, None).unwrap(), + original_entries, + ); + + let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); assert_eq!(meta.received, num_entries); - } else { - assert_eq!(meta.received, num_entries - 1); - } - assert_eq!(meta.consumed, 0); - assert_eq!(meta.parent_slot, 0); - if num_entries % 2 == 0 { + assert_eq!(meta.consumed, num_entries); + assert_eq!(meta.parent_slot, parent_slot); assert_eq!(meta.last_index, num_entries - 1); - } else { - assert_eq!(meta.last_index, std::u64::MAX); } - - blocktree.write_blobs(blobs.iter().step_by(2)).unwrap(); - - assert_eq!( - blocktree.get_slot_entries(0, 0, None).unwrap(), - original_entries, - ); - - let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); - assert_eq!(meta.received, num_entries); - assert_eq!(meta.consumed, num_entries); - assert_eq!(meta.parent_slot, 0); - assert_eq!(meta.last_index, num_entries - 1); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -2665,7 +2560,6 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } - #[cfg(feature = "erasure")] mod erasure { use super::*; use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; @@ -2730,7 +2624,7 @@ pub mod tests { assert_eq!(erasure_meta.data, 0x00FF); assert_eq!(erasure_meta.coding, 0x0); - let mut coding_generator = CodingGenerator::new(); + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); for shared_coding_blob in coding_blobs { @@ -2749,6 +2643,23 @@ pub mod tests { assert_eq!(erasure_meta.data, 0xFFFF); assert_eq!(erasure_meta.coding, 0x0F); + + let (start_idx, coding_end_idx) = + (erasure_meta.start_index(), erasure_meta.end_indexes().1); + + for idx in start_idx..coding_end_idx { + blocktree.delete_coding_blob(slot, idx).unwrap(); + } + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 0)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); + assert_eq!(erasure_meta.data, 0xFFFF); + assert_eq!(erasure_meta.coding, 0x0); } #[test] @@ -2766,11 +2677,12 @@ pub mod tests { .map(Blob::into) .collect::>(); - let mut coding_generator = CodingGenerator::new(); + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { let focused_index = (set_index + 1) * NUM_DATA - 1; let coding_blobs = coding_generator.next(&data_blobs); + assert_eq!(coding_blobs.len(), NUM_CODING); let deleted_data = data_blobs[NUM_DATA - 1].clone(); @@ -2821,13 +2733,12 @@ pub mod tests { Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction"); } - /// FIXME: JERASURE Threading: see Issue - /// [#3725](https://github.com/solana-labs/solana/issues/3725) #[test] fn test_recovery_multi_slot_multi_thread() { + use rand::rngs::SmallRng; + use rand::SeedableRng; use std::thread; - const USE_THREADS: bool = true; let slots = vec![0, 3, 5, 50, 100]; let max_erasure_sets = 16; solana_logger::setup(); @@ -2837,7 +2748,7 @@ pub mod tests { // Specification should generate a ledger where each slot has an random number of // erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones - // will have between 1-4 data blobs missing and all coding blobs + // will have between 1 data blob missing and 1 coding blob let specs = slots .iter() .map(|&slot| { @@ -2848,7 +2759,7 @@ pub mod tests { let (num_data, num_coding) = if set_index % 2 == 0 { (NUM_DATA - rng.gen_range(1, 5), NUM_CODING) } else { - (NUM_DATA, 0) + (NUM_DATA - 1, NUM_CODING - 1) }; ErasureSpec { set_index, @@ -2873,35 +2784,60 @@ pub mod tests { for slot_model in model.clone() { let blocktree = Arc::clone(&blocktree); let slot = slot_model.slot; - let closure = move || { + let mut rng = SmallRng::from_rng(&mut rng).unwrap(); + let handle = thread::spawn(move || { for erasure_set in slot_model.chunks { - blocktree - .write_shared_blobs(erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - - for shared_coding_blob in erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; + // for even sets, write data blobs first, then write coding blobs, which + // should trigger recovery since all coding blobs will be inserted and + // between 1-4 data blobs are missing + if rng.gen() { blocktree - .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - } - }; + .write_shared_blobs(erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); - if USE_THREADS { - handles.push(thread::spawn(closure)); - } else { - closure(); - } + for shared_coding_blob in erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } else { + // for odd sets, write coding blobs first, then write the data blobs. + // writing the data blobs should trigger recovery, since 3/4 coding and + // 15/16 data blobs will be present + for shared_coding_blob in erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + + blocktree + .write_shared_blobs(erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } + } + }); + + handles.push(handle); } handles @@ -2926,7 +2862,7 @@ pub mod tests { ); // all possibility for recovery should be exhausted - assert!(!erasure_meta.can_recover()); + assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); // Should have all data assert_eq!(erasure_meta.data, 0xFFFF); if set_index % 2 == 0 { diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 4bdb8ad2bc..365673792a 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -28,7 +28,6 @@ pub mod columns { /// Data Column pub struct Data; - #[cfg(feature = "erasure")] #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index d5cc0784e8..a9eee4137e 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -138,7 +138,6 @@ impl TypedColumn for cf::SlotMeta { type Type = super::SlotMeta; } -#[cfg(feature = "erasure")] impl Column for cf::ErasureMeta { const NAME: &'static str = super::ERASURE_META_CF; type Index = (u64, u64); @@ -157,7 +156,6 @@ impl Column for cf::ErasureMeta { } } -#[cfg(feature = "erasure")] impl TypedColumn for cf::ErasureMeta { type Type = super::ErasureMeta; } diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 824d468c93..e90f5077e7 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "erasure")] use crate::erasure::{NUM_CODING, NUM_DATA}; #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] @@ -59,7 +58,6 @@ impl SlotMeta { } } -#[cfg(feature = "erasure")] #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] /// Erasure coding information pub struct ErasureMeta { @@ -71,7 +69,13 @@ pub struct ErasureMeta { pub coding: u64, } -#[cfg(feature = "erasure")] +#[derive(Debug, PartialEq)] +pub enum ErasureMetaStatus { + CanRecover, + DataFull, + StillNeed(usize), +} + impl ErasureMeta { pub fn new(set_index: u64) -> ErasureMeta { ErasureMeta { @@ -81,46 +85,71 @@ impl ErasureMeta { } } - pub fn can_recover(&self) -> bool { + pub fn status(&self) -> ErasureMetaStatus { let (data_missing, coding_missing) = ( NUM_DATA - self.data.count_ones() as usize, NUM_CODING - self.coding.count_ones() as usize, ); - - data_missing > 0 && data_missing + coding_missing <= NUM_CODING + if data_missing > 0 && data_missing + coding_missing <= NUM_CODING { + ErasureMetaStatus::CanRecover + } else if data_missing == 0 { + ErasureMetaStatus::DataFull + } else { + ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING) + } } pub fn is_coding_present(&self, index: u64) -> bool { - let set_index = Self::set_index_for(index); - let position = index - self.start_index(); + let start = self.start_index(); + let end = start + NUM_CODING as u64; - set_index == self.set_index && self.coding & (1 << position) != 0 + if start <= index && index < end { + let position = index - start; + + self.coding & (1 << position) != 0 + } else { + false + } } - pub fn set_coding_present(&mut self, index: u64) { + pub fn set_coding_present(&mut self, index: u64, present: bool) { let set_index = Self::set_index_for(index); if set_index as u64 == self.set_index { let position = index - self.start_index(); - self.coding |= 1 << position; + if present { + self.coding |= 1 << position; + } else { + self.coding &= !(1 << position); + } } } pub fn is_data_present(&self, index: u64) -> bool { - let set_index = Self::set_index_for(index); - let position = index - self.start_index(); + let start = self.start_index(); + let end = start + NUM_DATA as u64; - set_index == self.set_index && self.data & (1 << position) != 0 + if start <= index && index < end { + let position = index - start; + + self.data & (1 << position) != 0 + } else { + false + } } - pub fn set_data_present(&mut self, index: u64) { + pub fn set_data_present(&mut self, index: u64, present: bool) { let set_index = Self::set_index_for(index); if set_index as u64 == self.set_index { let position = index - self.start_index(); - self.data |= 1 << position; + if present { + self.data |= 1 << position; + } else { + self.data &= !(1 << position); + } } } @@ -139,7 +168,29 @@ impl ErasureMeta { } } -#[cfg(feature = "erasure")] +#[test] +fn test_meta_indexes() { + use rand::{thread_rng, Rng}; + + let mut rng = thread_rng(); + + for _ in 0..100 { + let set_index = rng.gen_range(0, 1_000); + let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16); + + assert_eq!(set_index, ErasureMeta::set_index_for(blob_index)); + let e_meta = ErasureMeta::new(set_index); + + assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64); + let (data_end_idx, coding_end_idx) = e_meta.end_indexes(); + assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64); + assert_eq!( + coding_end_idx, + set_index * NUM_DATA as u64 + NUM_CODING as u64 + ); + } +} + #[test] fn test_meta_coding_present() { let set_index = 0; @@ -150,7 +201,7 @@ fn test_meta_coding_present() { }; for i in 0..NUM_CODING as u64 { - e_meta.set_coding_present(i); + e_meta.set_coding_present(i, true); assert_eq!(e_meta.is_coding_present(i), true); } for i in NUM_CODING as u64..NUM_DATA as u64 { @@ -160,7 +211,7 @@ fn test_meta_coding_present() { e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64); for i in (NUM_DATA * 17) as u64..((NUM_DATA * 17) + NUM_CODING) as u64 { - e_meta.set_coding_present(i); + e_meta.set_coding_present(i, true); assert_eq!(e_meta.is_coding_present(i), true); } for i in (NUM_DATA * 17 + NUM_CODING) as u64..((NUM_DATA * 17) + NUM_DATA) as u64 { @@ -168,9 +219,8 @@ fn test_meta_coding_present() { } } -#[cfg(feature = "erasure")] #[test] -fn test_can_recover() { +fn test_erasure_meta_status() { let set_index = 0; let mut e_meta = ErasureMeta { set_index, @@ -178,36 +228,63 @@ fn test_can_recover() { coding: 0, }; - assert!(!e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA)); e_meta.data = 0b1111_1111_1111_1111; e_meta.coding = 0x00; - assert!(!e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); e_meta.coding = 0x0e; - assert_eq!(0x0fu8, 0b0000_1111u8); - assert!(!e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); e_meta.data = 0b0111_1111_1111_1111; - assert!(e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); e_meta.data = 0b0111_1111_1111_1110; - assert!(e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); e_meta.data = 0b0111_1111_1011_1110; - assert!(e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); e_meta.data = 0b0111_1011_1011_1110; - assert!(!e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1)); e_meta.data = 0b0111_1011_1011_1110; - assert!(!e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1)); e_meta.coding = 0b0000_1110; e_meta.data = 0b1111_1111_1111_1100; - assert!(e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); e_meta.data = 0b1111_1111_1111_1000; - assert!(e_meta.can_recover()); + assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); +} + +#[test] +fn test_meta_data_present() { + let set_index = 0; + let mut e_meta = ErasureMeta { + set_index, + data: 0, + coding: 0, + }; + + for i in 0..NUM_DATA as u64 { + e_meta.set_data_present(i, true); + assert_eq!(e_meta.is_data_present(i), true); + } + for i in NUM_DATA as u64..2 * NUM_DATA as u64 { + assert_eq!(e_meta.is_data_present(i), false); + } + + e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64); + + for i in (NUM_DATA * 23) as u64..(NUM_DATA * 24) as u64 { + e_meta.set_data_present(i, true); + assert_eq!(e_meta.is_data_present(i), true); + } + for i in (NUM_DATA * 22) as u64..(NUM_DATA * 23) as u64 { + assert_eq!(e_meta.is_data_present(i), false); + } } diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 0f3bdaf526..4d881d935c 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,9 +30,7 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { - #[cfg(feature = "erasure")] - use crate::blocktree::db::columns::ErasureMeta; - use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta}; + use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; fs::create_dir_all(&path)?; @@ -43,7 +41,6 @@ impl Backend for Rocks { let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); - #[cfg(feature = "erasure")] let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); @@ -52,7 +49,6 @@ impl Backend for Rocks { meta_cf_descriptor, data_cf_descriptor, erasure_cf_descriptor, - #[cfg(feature = "erasure")] erasure_meta_cf_descriptor, orphans_cf_descriptor, ]; @@ -64,13 +60,10 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { - #[cfg(feature = "erasure")] - use crate::blocktree::db::columns::ErasureMeta; - use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta}; + use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; vec![ Coding::NAME, - #[cfg(feature = "erasure")] ErasureMeta::NAME, Data::NAME, Orphans::NAME, @@ -196,7 +189,6 @@ impl TypedColumn for cf::SlotMeta { type Type = super::SlotMeta; } -#[cfg(feature = "erasure")] impl Column for cf::ErasureMeta { const NAME: &'static str = super::ERASURE_META_CF; type Index = (u64, u64); @@ -216,7 +208,6 @@ impl Column for cf::ErasureMeta { } } -#[cfg(feature = "erasure")] impl TypedColumn for cf::ErasureMeta { type Type = super::ErasureMeta; } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index c23227dc16..32fe2ef404 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -3,7 +3,6 @@ use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; use crate::entry::{EntrySender, EntrySlice}; -#[cfg(feature = "erasure")] use crate::erasure::CodingGenerator; use crate::packet::index_blobs; use crate::poh_recorder::WorkingBankEntries; @@ -29,8 +28,6 @@ pub enum BroadcastStageReturnType { struct Broadcast { id: Pubkey, - - #[cfg(feature = "erasure")] coding_generator: CodingGenerator, } @@ -119,7 +116,6 @@ impl Broadcast { blocktree.write_shared_blobs(&blobs)?; - #[cfg(feature = "erasure")] let coding = self.coding_generator.next(&blobs); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); @@ -129,14 +125,10 @@ impl Broadcast { // Send out data ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; - #[cfg(feature = "erasure")] - ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; - inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - // generate and transmit any erasure coding blobs. if erasure isn't supported, just send everything again - #[cfg(not(feature = "erasure"))] - ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; + // send out erasures + ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); @@ -194,11 +186,11 @@ impl BroadcastStage { storage_entry_sender: EntrySender, ) -> BroadcastStageReturnType { let me = cluster_info.read().unwrap().my_data().clone(); + let coding_generator = CodingGenerator::default(); let mut broadcast = Broadcast { id: me.id, - #[cfg(feature = "erasure")] - coding_generator: CodingGenerator::new(), + coding_generator, }; loop { @@ -284,9 +276,9 @@ mod test { use crate::entry::create_ticks; use crate::service::Service; use solana_runtime::bank::Bank; + use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -321,7 +313,9 @@ mod test { let exit_sender = Arc::new(AtomicBool::new(false)); let (storage_sender, _receiver) = channel(); - let bank = Arc::new(Bank::default()); + + let (genesis_block, _) = GenesisBlock::new(10_000); + let bank = Arc::new(Bank::new(&genesis_block)); // Start up the broadcast stage let broadcast_service = BroadcastStage::new( @@ -341,15 +335,13 @@ mod test { } #[test] - #[ignore] - //TODO this test won't work since broadcast stage no longer edits the ledger fn test_broadcast_ledger() { + solana_logger::setup(); let ledger_path = get_tmp_ledger_path("test_broadcast_ledger"); + { // Create the leader scheduler let leader_keypair = Keypair::new(); - let start_tick_height = 0; - let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT; let (entry_sender, entry_receiver) = channel(); let broadcast_service = setup_dummy_broadcast_service( @@ -358,6 +350,9 @@ mod test { entry_receiver, ); let bank = broadcast_service.bank.clone(); + let start_tick_height = bank.tick_height(); + let max_tick_height = bank.max_tick_height(); + let ticks_per_slot = bank.ticks_per_slot(); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); for (i, tick) in ticks.into_iter().enumerate() { @@ -367,15 +362,23 @@ mod test { } sleep(Duration::from_millis(2000)); + + trace!( + "[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}", + max_tick_height, + start_tick_height, + ticks_per_slot, + ); + let blocktree = broadcast_service.blocktree; let mut blob_index = 0; for i in 0..max_tick_height - start_tick_height { - let slot = (start_tick_height + i + 1) / DEFAULT_TICKS_PER_SLOT; + let slot = (start_tick_height + i + 1) / ticks_per_slot; let result = blocktree.get_data_blob(slot, blob_index).unwrap(); blob_index += 1; - assert!(result.is_some()); + result.expect("expect blob presence"); } drop(entry_sender); diff --git a/core/src/erasure.rs b/core/src/erasure.rs index b7fdffc2b1..d79889306f 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -1,278 +1,217 @@ -// Support erasure coding -use crate::packet::{Blob, SharedBlob}; -use crate::result::{Error, Result}; +//! # Erasure Coding and Recovery +//! +//! Blobs are logically grouped into erasure sets or blocks. Each set contains 16 sequential data +//! blobs and 4 sequential coding blobs. +//! +//! Coding blobs in each set starting from `start_idx`: +//! For each erasure set: +//! generate `NUM_CODING` coding_blobs. +//! index the coding blobs from `start_idx` to `start_idx + NUM_CODING - 1`. +//! +//! model of an erasure set, with top row being data blobs and second being coding +//! |<======================= NUM_DATA ==============================>| +//! |<==== NUM_CODING ===>| +//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +//! | D | | D | | D | | D | | D | | D | | D | | D | | D | | D | +//! +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ +//! | C | | C | | C | | C | | | | | | | | | | | | | +//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +//! +//! blob structure for coding blobs +//! +//! + ------- meta is set and used by transport, meta.size is actual length +//! | of data in the byte array blob.data +//! | +//! | + -- data is stuff shipped over the wire, and has an included +//! | | header +//! V V +//! +----------+------------------------------------------------------------+ +//! | meta | data | +//! |+---+-- |+---+---+---+---+------------------------------------------+| +//! || s | . || i | | f | s | || +//! || i | . || n | i | l | i | || +//! || z | . || d | d | a | z | blob.data(), or blob.data_mut() || +//! || e | || e | | g | e | || +//! |+---+-- || x | | s | | || +//! | |+---+---+---+---+------------------------------------------+| +//! +----------+------------------------------------------------------------+ +//! | |<=== coding blob part for "coding" =======>| +//! | | +//! |<============== data blob part for "coding" ==============>| +//! +//! +use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; +use crate::result::Result; use std::cmp; +use std::convert::AsMut; use std::sync::{Arc, RwLock}; +use reed_solomon_erasure::ReedSolomon; + //TODO(sakridge) pick these values -pub const NUM_DATA: usize = 16; // number of data blobs -pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing -pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs +/// Number of data blobs +pub const NUM_DATA: usize = 16; +/// Number of coding blobs; also the maximum number that can go missing. +pub const NUM_CODING: usize = 4; +/// Total number of blobs in an erasure set; includes data and coding blobs +pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; -macro_rules! align { - ($x:expr, $align:expr) => { - $x + ($align - 1) & !($align - 1) - }; -} +/// Represents an erasure "session" with a particular configuration and number of data and coding +/// blobs +#[derive(Debug, Clone)] +pub struct Session(ReedSolomon); -#[derive(Debug, PartialEq, Eq)] -pub enum ErasureError { - NotEnoughBlocksToDecode, - DecodeError, - EncodeError, - InvalidBlockSize, - InvalidBlobData, - CorruptCoding, -} - -// k = number of data devices -// m = number of coding devices -// w = word size - -extern "C" { - fn jerasure_matrix_encode( - k: i32, - m: i32, - w: i32, - matrix: *const i32, - data_ptrs: *const *const u8, - coding_ptrs: *const *mut u8, - size: i32, - ); - fn jerasure_matrix_decode( - k: i32, - m: i32, - w: i32, - matrix: *const i32, - row_k_ones: i32, - erasures: *const i32, - data_ptrs: *const *mut u8, - coding_ptrs: *const *mut u8, - size: i32, - ) -> i32; - fn galois_single_divide(a: i32, b: i32, w: i32) -> i32; - fn galois_init_default_field(w: i32) -> i32; -} - -use std::sync::Once; -static ERASURE_W_ONCE: Once = Once::new(); - -// jerasure word size of 32 -fn w() -> i32 { - let w = 32; - unsafe { - ERASURE_W_ONCE.call_once(|| { - galois_init_default_field(w); - () - }); - } - w -} - -// jerasure checks that arrays are a multiple of w()/8 in length -fn wb() -> usize { - (w() / 8) as usize -} - -fn get_matrix(m: i32, k: i32, w: i32) -> Vec { - let mut matrix = vec![0; (m * k) as usize]; - for i in 0..m { - for j in 0..k { - unsafe { - matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w); - } - } - } - matrix -} - -// Generate coding blocks into coding -// There are some alignment restrictions, blocks should be aligned by 16 bytes -// which means their size should be >= 16 bytes -fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> { - if data.is_empty() { - return Ok(()); - } - let k = data.len() as i32; - let m = coding.len() as i32; - let block_len = data[0].len() as i32; - let matrix: Vec = get_matrix(m, k, w()); - let mut data_arg = Vec::with_capacity(data.len()); - for block in data { - if block_len != block.len() as i32 { - error!( - "data block size incorrect {} expected {}", - block.len(), - block_len - ); - return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); - } - data_arg.push(block.as_ptr()); - } - let mut coding_arg = Vec::with_capacity(coding.len()); - for block in coding { - if block_len != block.len() as i32 { - error!( - "coding block size incorrect {} expected {}", - block.len(), - block_len - ); - return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); - } - coding_arg.push(block.as_mut_ptr()); - } - - unsafe { - jerasure_matrix_encode( - k, - m, - w(), - matrix.as_ptr(), - data_arg.as_ptr(), - coding_arg.as_ptr(), - block_len, - ); - } - Ok(()) -} - -// Recover data + coding blocks into data blocks -// data: array of blocks to recover into -// coding: arry of coding blocks -// erasures: list of indices in data where blocks should be recovered -pub fn decode_blocks( - data: &mut [&mut [u8]], - coding: &mut [&mut [u8]], - erasures: &[i32], -) -> Result<()> { - if data.is_empty() { - return Ok(()); - } - let block_len = data[0].len(); - let matrix: Vec = get_matrix(coding.len() as i32, data.len() as i32, w()); - - // generate coding pointers, blocks should be the same size - let mut coding_arg: Vec<*mut u8> = Vec::new(); - for x in coding.iter_mut() { - if x.len() != block_len { - return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); - } - coding_arg.push(x.as_mut_ptr()); - } - - // generate data pointers, blocks should be the same size - let mut data_arg: Vec<*mut u8> = Vec::new(); - for x in data.iter_mut() { - if x.len() != block_len { - return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); - } - data_arg.push(x.as_mut_ptr()); - } - let ret = unsafe { - jerasure_matrix_decode( - data.len() as i32, - coding.len() as i32, - w(), - matrix.as_ptr(), - 0, - erasures.as_ptr(), - data_arg.as_ptr(), - coding_arg.as_ptr(), - data[0].len() as i32, - ) - }; - trace!("jerasure_matrix_decode ret: {}", ret); - for x in data[erasures[0] as usize][0..8].iter() { - trace!("{} ", x) - } - trace!(""); - if ret < 0 { - return Err(Error::ErasureError(ErasureError::DecodeError)); - } - Ok(()) -} - -// Generate coding blocks in window starting from start_idx, -// for num_blobs.. For each block place the coding blobs -// at the start of the block like so: -// -// model of an erasure set, with top row being data blobs and second being coding -// |<======================= NUM_DATA ==============================>| -// |<==== NUM_CODING ===>| -// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ -// | D | | D | | D | | D | | D | | D | | D | | D | | D | | D | -// +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ -// | C | | C | | C | | C | | | | | | | | | | | | | -// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ -// -// blob structure for coding, recover -// -// + ------- meta is set and used by transport, meta.size is actual length -// | of data in the byte array blob.data -// | -// | + -- data is stuff shipped over the wire, and has an included -// | | header -// V V -// +----------+------------------------------------------------------------+ -// | meta | data | -// |+---+-- |+---+---+---+---+------------------------------------------+| -// || s | . || i | | f | s | || -// || i | . || n | i | l | i | || -// || z | . || d | d | a | z | blob.data(), or blob.data_mut() || -// || e | || e | | g | e | || -// |+---+-- || x | | s | | || -// | |+---+---+---+---+------------------------------------------+| -// +----------+------------------------------------------------------------+ -// | |<=== coding blob part for "coding" =======>| -// | | -// |<============== data blob part for "coding" ==============>| -// -// -// +/// Generates coding blobs on demand given data blobs +#[derive(Debug, Clone)] pub struct CodingGenerator { - leftover: Vec, // SharedBlobs that couldn't be used in last call to next() + /// SharedBlobs that couldn't be used in last call to next() + leftover: Vec, + session: Arc, } -impl Default for CodingGenerator { - fn default() -> Self { - CodingGenerator { - leftover: Vec::with_capacity(NUM_DATA), +impl Session { + pub fn new(data_count: usize, coding_count: usize) -> Result { + let rs = ReedSolomon::new(data_count, coding_count)?; + + Ok(Session(rs)) + } + + /// Create coding blocks by overwriting `parity` + pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> { + self.0.encode_sep(data, parity)?; + + Ok(()) + } + + /// Recover data + coding blocks into data blocks + /// # Arguments + /// * `data` - array of data blocks to recover into + /// * `coding` - array of coding blocks + /// * `erasures` - list of indices in data where blocks should be recovered + pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> { + self.0.reconstruct(blocks, present)?; + + Ok(()) + } + + /// Returns `(number_of_data_blobs, number_of_coding_blobs)` + pub fn dimensions(&self) -> (usize, usize) { + (self.0.data_shard_count(), self.0.parity_shard_count()) + } + + /// Reconstruct any missing blobs in this erasure set if possible + /// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata + /// Assumes that the user has sliced into the blobs appropriately already. else recovery will + /// return an error or garbage data + pub fn reconstruct_blobs( + &self, + blobs: &mut [B], + present: &[bool], + size: usize, + block_start_idx: u64, + slot: u64, + ) -> Result<(Vec, Vec)> + where + B: AsMut<[u8]>, + { + let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect(); + + trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,); + + // Decode the blocks + self.decode_blocks(blocks.as_mut_slice(), &present)?; + + let mut recovered_data = vec![]; + let mut recovered_coding = vec![]; + + let erasures = present + .iter() + .enumerate() + .filter_map(|(i, present)| if *present { None } else { Some(i) }); + + // Create the missing blobs from the reconstructed data + for n in erasures { + let data_size; + let idx; + let first_byte; + + if n < NUM_DATA { + let mut blob = Blob::new(&blocks[n]); + + data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; + idx = n as u64 + block_start_idx; + first_byte = blob.data[0]; + + blob.set_size(data_size); + recovered_data.push(blob); + } else { + let mut blob = Blob::default(); + blob.data_mut()[..size].copy_from_slice(&blocks[n]); + data_size = size; + idx = (n as u64 + block_start_idx) - NUM_DATA as u64; + first_byte = blob.data[0]; + + blob.set_slot(slot); + blob.set_index(idx); + blob.set_size(data_size); + recovered_coding.push(blob); + } + + trace!( + "[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}", + n, + idx, + data_size, + first_byte + ); } + + Ok((recovered_data, recovered_coding)) } } impl CodingGenerator { - pub fn new() -> Self { - Self::default() + pub fn new(session: Arc) -> Self { + CodingGenerator { + leftover: Vec::with_capacity(session.0.data_shard_count()), + session, + } } - // must be called with consecutive data blobs from previous invocation - // blobs from a new slot not start halfway through next_data + /// Yields next set of coding blobs, if any. + /// Must be called with consecutive data blobs within a slot. + /// + /// Passing in a slice with the first blob having a new slot will cause internal state to + /// reset, so the above concern does not apply to slot boundaries, only indexes within a slot + /// must be consecutive. + /// + /// If used improperly, it my return garbage coding blobs, but will not give an + /// error. pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec { + let (num_data, num_coding) = self.session.dimensions(); let mut next_coding = - Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING); + Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding); - if self.leftover.len() > 0 && next_data.len() > 0 { - if self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() { - self.leftover.clear(); // reset on slot boundaries - } + if !self.leftover.is_empty() + && !next_data.is_empty() + && self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() + { + self.leftover.clear(); } + let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect(); - for data_blobs in next_data.chunks(NUM_DATA) { - if data_blobs.len() < NUM_DATA { + for data_blobs in next_data.chunks(num_data) { + if data_blobs.len() < num_data { self.leftover = data_blobs.to_vec(); break; } self.leftover.clear(); - // find max_data_size for the chunk, round length up to a multiple of wb() - let max_data_size = align!( - data_blobs - .iter() - .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)), - wb() - ); + // find max_data_size for the erasure set + let max_data_size = data_blobs + .iter() + .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)); let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); let data_ptrs: Vec<_> = data_locks @@ -280,9 +219,9 @@ impl CodingGenerator { .map(|l| &l.data[..max_data_size]) .collect(); - let mut coding_blobs = Vec::with_capacity(NUM_CODING); + let mut coding_blobs = Vec::with_capacity(num_coding); - for data_blob in &data_locks[..NUM_CODING] { + for data_blob in &data_locks[..num_coding] { let index = data_blob.index(); let slot = data_blob.slot(); let id = data_blob.id(); @@ -305,7 +244,7 @@ impl CodingGenerator { .map(|blob| &mut blob.data_mut()[..max_data_size]) .collect(); - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs) + self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice()) } .is_ok() { @@ -320,12 +259,27 @@ impl CodingGenerator { } } +impl Default for Session { + fn default() -> Session { + Session::new(NUM_DATA, NUM_CODING).unwrap() + } +} + +impl Default for CodingGenerator { + fn default() -> Self { + let session = Session::default(); + CodingGenerator { + leftover: Vec::with_capacity(session.0.data_shard_count()), + session: Arc::new(session), + } + } +} + #[cfg(test)] pub mod test { use super::*; use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::Blocktree; - use crate::entry::{make_tiny_test_entries, EntrySlice}; use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -368,63 +322,63 @@ pub mod test { #[test] fn test_coding() { - let zero_vec = vec![0; 16]; - let mut vs: Vec> = (0..4).map(|i| (i..(16 + i)).collect()).collect(); + const N_DATA: usize = 4; + const N_CODING: usize = 2; + + let session = Session::new(N_DATA, N_CODING).unwrap(); + + let mut vs: Vec> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect(); let v_orig: Vec = vs[0].clone(); - let m = 2; - let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect(); + let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect(); - { - let mut coding_blocks_slices: Vec<_> = - coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect(); - let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect(); + let mut coding_blocks_slices: Vec<_> = + coding_blocks.iter_mut().map(Vec::as_mut_slice).collect(); + let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect(); + + session + .encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice()) + .expect("encoding must succeed"); - assert!(generate_coding_blocks( - coding_blocks_slices.as_mut_slice(), - v_slices.as_slice(), - ) - .is_ok()); - } trace!("test_coding: coding blocks:"); for b in &coding_blocks { trace!("test_coding: {:?}", b); } - let erasure: i32 = 1; - let erasures = vec![erasure, -1]; + + let erasure: usize = 1; + let present = &mut [true; N_DATA + N_CODING]; + present[erasure] = false; + let erased = vs[erasure].clone(); + // clear an entry - vs[erasure as usize].copy_from_slice(zero_vec.as_slice()); + vs[erasure as usize].copy_from_slice(&[0; 16]); - { - let mut coding_blocks_slices: Vec<_> = - coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect(); - let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect(); + let mut blocks: Vec<_> = vs + .iter_mut() + .chain(coding_blocks.iter_mut()) + .map(Vec::as_mut_slice) + .collect(); - assert!(decode_blocks( - v_slices.as_mut_slice(), - coding_blocks_slices.as_mut_slice(), - erasures.as_slice(), - ) - .is_ok()); - } + session + .decode_blocks(blocks.as_mut_slice(), present) + .expect("decoding must succeed"); trace!("test_coding: vs:"); for v in &vs { trace!("test_coding: {:?}", v); } assert_eq!(v_orig, vs[0]); + assert_eq!(erased, vs[erasure]); } fn test_toss_and_recover( + session: &Session, data_blobs: &[SharedBlob], coding_blobs: &[SharedBlob], block_start_idx: usize, ) { let size = coding_blobs[0].read().unwrap().size(); - // toss one data and one coding - let erasures: Vec = vec![0, NUM_DATA as i32, -1]; - let mut blobs: Vec = Vec::with_capacity(ERASURE_SET_SIZE); blobs.push(SharedBlob::default()); // empty data, erasure at zero @@ -432,14 +386,23 @@ pub mod test { // skip first blob blobs.push(blob.clone()); } + blobs.push(SharedBlob::default()); // empty coding, erasure at zero for blob in &coding_blobs[1..NUM_CODING] { blobs.push(blob.clone()); } - let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap(); + // toss one data and one coding + let mut present = vec![true; blobs.len()]; + present[0] = false; + present[NUM_DATA] = false; - assert!(!corrupt); + let (recovered_data, recovered_coding) = session + .reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0) + .expect("reconstruction must succeed"); + + assert_eq!(recovered_data.len(), 1); + assert_eq!(recovered_coding.len(), 1); assert_eq!( blobs[1].read().unwrap().meta, @@ -450,15 +413,15 @@ pub mod test { data_blobs[block_start_idx + 1].read().unwrap().data() ); assert_eq!( - blobs[0].read().unwrap().meta, + recovered_data[0].meta, data_blobs[block_start_idx].read().unwrap().meta ); assert_eq!( - blobs[0].read().unwrap().data(), + recovered_data[0].data(), data_blobs[block_start_idx].read().unwrap().data() ); assert_eq!( - blobs[NUM_DATA].read().unwrap().data(), + recovered_coding[0].data(), coding_blobs[0].read().unwrap().data() ); } @@ -468,11 +431,11 @@ pub mod test { solana_logger::setup(); // trivial case - let mut coding_generator = CodingGenerator::new(); + let mut coding_generator = CodingGenerator::default(); let blobs = Vec::new(); for _ in 0..NUM_DATA * 2 { let coding = coding_generator.next(&blobs); - assert_eq!(coding.len(), 0); + assert!(coding.is_empty()); } // test coding by iterating one blob at a time @@ -480,6 +443,7 @@ pub mod test { for (i, blob) in data_blobs.iter().cloned().enumerate() { let coding_blobs = coding_generator.next(&[blob]); + if !coding_blobs.is_empty() { assert_eq!(i % NUM_DATA, NUM_DATA - 1); assert_eq!(coding_blobs.len(), NUM_CODING); @@ -490,7 +454,12 @@ pub mod test { ((i / NUM_DATA) * NUM_DATA + j) as u64 ); } - test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA)); + test_toss_and_recover( + &coding_generator.session, + &data_blobs, + &coding_blobs, + i - (i % NUM_DATA), + ); } } } @@ -499,7 +468,7 @@ pub mod test { fn test_erasure_generate_coding_reset_on_new_slot() { solana_logger::setup(); - let mut coding_generator = CodingGenerator::new(); + let mut coding_generator = CodingGenerator::default(); // test coding by iterating one blob at a time let data_blobs = generate_test_blobs(0, NUM_DATA * 2); @@ -509,13 +478,18 @@ pub mod test { } let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]); - assert_eq!(coding_blobs.len(), 0); + assert!(coding_blobs.is_empty()); let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]); assert_eq!(coding_blobs.len(), NUM_CODING); - test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA); + test_toss_and_recover( + &coding_generator.session, + &data_blobs, + &coding_blobs, + NUM_DATA, + ); } #[test] @@ -571,24 +545,17 @@ pub mod test { } } - /// This test is ignored because if successful, it never stops running. It is useful for - /// dicovering an initialization race-condition in the erasure FFI bindings. If this bug - /// re-emerges, running with `Z_THREADS = N` where `N > 1` should crash fairly rapidly. - #[ignore] #[test] fn test_recovery_with_model() { - use std::env; - use std::sync::{Arc, Mutex}; use std::thread; const MAX_ERASURE_SETS: u64 = 16; - solana_logger::setup(); - let n_threads: usize = env::var("Z_THREADS") - .unwrap_or("1".to_string()) - .parse() - .unwrap(); + const N_THREADS: usize = 2; + const N_SLOTS: u64 = 10; - let specs = (0..).map(|slot| { + solana_logger::setup(); + + let specs = (0..N_SLOTS).map(|slot| { let num_erasure_sets = slot % MAX_ERASURE_SETS; let set_specs = (0..num_erasure_sets) @@ -602,12 +569,12 @@ pub mod test { SlotSpec { slot, set_specs } }); - let decode_mutex = Arc::new(Mutex::new(())); let mut handles = vec![]; + let session = Arc::new(Session::default()); - for i in 0..n_threads { + for i in 0..N_THREADS { let specs = specs.clone(); - let decode_mutex = Arc::clone(&decode_mutex); + let session = Arc::clone(&session); let handle = thread::Builder::new() .name(i.to_string()) @@ -617,55 +584,39 @@ pub mod test { let erased_coding = erasure_set.coding[0].clone(); let erased_data = erasure_set.data[..3].to_vec(); - let mut data = Vec::with_capacity(NUM_DATA); - let mut coding = Vec::with_capacity(NUM_CODING); - let erasures = vec![0, 1, 2, NUM_DATA as i32, -1]; + let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); - data.push(SharedBlob::default()); - data.push(SharedBlob::default()); - data.push(SharedBlob::default()); + blobs.push(SharedBlob::default()); + blobs.push(SharedBlob::default()); + blobs.push(SharedBlob::default()); for blob in erasure_set.data.into_iter().skip(3) { - data.push(blob); + blobs.push(blob); } - coding.push(SharedBlob::default()); + blobs.push(SharedBlob::default()); for blob in erasure_set.coding.into_iter().skip(1) { - coding.push(blob); + blobs.push(blob); } - let size = erased_coding.read().unwrap().data_size() as usize; + let size = erased_coding.read().unwrap().size() as usize; - let mut data_locks: Vec<_> = - data.iter().map(|shared| shared.write().unwrap()).collect(); - let mut coding_locks: Vec<_> = coding - .iter() - .map(|shared| shared.write().unwrap()) - .collect(); + let mut present = vec![true; ERASURE_SET_SIZE]; + present[0] = false; + present[1] = false; + present[2] = false; + present[NUM_DATA] = false; - let mut data_ptrs: Vec<_> = data_locks - .iter_mut() - .map(|blob| &mut blob.data[..size]) - .collect(); - let mut coding_ptrs: Vec<_> = coding_locks - .iter_mut() - .map(|blob| &mut blob.data_mut()[..size]) - .collect(); - - { - let _lock = decode_mutex.lock(); - - decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, + session + .reconstruct_shared_blobs( + &mut blobs, + &present, + size, + erasure_set.set_index * NUM_DATA as u64, + slot_model.slot, ) - .expect("decoding must succeed"); - } + .expect("reconstruction must succeed"); - drop(coding_locks); - drop(data_locks); - - for (expected, recovered) in erased_data.iter().zip(data.iter()) { + for (expected, recovered) in erased_data.iter().zip(blobs.iter()) { let expected = expected.read().unwrap(); let mut recovered = recovered.write().unwrap(); let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE; @@ -677,7 +628,7 @@ pub mod test { assert_eq!( erased_coding.read().unwrap().data(), - coding[0].read().unwrap().data() + blobs[NUM_DATA].read().unwrap().data() ); debug!("passed set: {}", erasure_set.set_index); @@ -702,7 +653,9 @@ pub mod test { IntoIt: Iterator + Clone + 'a, S: Borrow, { - specs.into_iter().map(|spec| { + let mut coding_generator = CodingGenerator::default(); + + specs.into_iter().map(move |spec| { let spec = spec.borrow(); let slot = spec.slot; @@ -713,7 +666,7 @@ pub mod test { let set_index = erasure_spec.set_index as usize; let start_index = set_index * NUM_DATA; - let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs(); + let mut blobs = generate_test_blobs(0, NUM_DATA); index_blobs( &blobs, &Keypair::new().pubkey(), @@ -722,7 +675,6 @@ pub mod test { 0, ); - let mut coding_generator = CodingGenerator::new(); let mut coding_blobs = coding_generator.next(&blobs); blobs.drain(erasure_spec.num_data..); @@ -770,84 +722,60 @@ pub mod test { blocktree } + // fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool { + // let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); + // + // blobs.iter().enumerate().all(|(i, blob)| { + // let blob = blob.read().unwrap(); + // blob.index() as usize == i + offset && blob.data() == &data[..] + // }) + // } + // fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { - let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs(); + let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); + + let blobs: Vec<_> = (0..num_blobs) + .into_iter() + .map(|_| { + let mut blob = Blob::default(); + blob.data_mut()[..data.len()].copy_from_slice(&data); + blob.set_size(data.len()); + Arc::new(RwLock::new(blob)) + }) + .collect(); index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0); + blobs } - fn decode_blobs( - blobs: &[SharedBlob], - erasures: &[i32], - size: usize, - block_start_idx: u64, - slot: u64, - ) -> Result { - let mut locks = Vec::with_capacity(ERASURE_SET_SIZE); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + impl Session { + fn reconstruct_shared_blobs( + &self, + blobs: &mut [SharedBlob], + present: &[bool], + size: usize, + block_start_idx: u64, + slot: u64, + ) -> Result<(Vec, Vec)> { + let mut locks: Vec> = blobs + .iter() + .map(|shared_blob| shared_blob.write().unwrap()) + .collect(); - assert_eq!(blobs.len(), ERASURE_SET_SIZE); - for b in blobs { - locks.push(b.write().unwrap()); + let mut slices: Vec<_> = locks + .iter_mut() + .enumerate() + .map(|(i, blob)| { + if i < NUM_DATA { + &mut blob.data[..size] + } else { + &mut blob.data_mut()[..size] + } + }) + .collect(); + + self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot) } - - for (i, l) in locks.iter_mut().enumerate() { - if i < NUM_DATA { - data_ptrs.push(&mut l.data[..size]); - } else { - coding_ptrs.push(&mut l.data_mut()[..size]); - } - } - - // Decode the blocks - decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, - )?; - - // Create the missing blobs from the reconstructed data - let mut corrupt = false; - - for i in &erasures[..erasures.len() - 1] { - let n = *i as usize; - let mut idx = n as u64 + block_start_idx; - - let mut data_size; - if n < NUM_DATA { - data_size = locks[n].data_size() as usize; - data_size -= BLOB_HEADER_SIZE; - if data_size > BLOB_DATA_SIZE { - error!("corrupt data blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } else { - data_size = size; - idx -= NUM_DATA as u64; - locks[n].set_slot(slot); - locks[n].set_index(idx); - - if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { - error!("corrupt coding blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } - - locks[n].set_size(data_size); - trace!( - "erasures[{}] ({}) size: {} data[0]: {}", - *i, - idx, - data_size, - locks[n].data()[0] - ); - } - - Ok(corrupt) } - } diff --git a/core/src/lib.rs b/core/src/lib.rs index 82b28dc75e..c6206781f5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -31,7 +31,6 @@ pub mod cluster; pub mod cluster_info; pub mod cluster_tests; pub mod entry; -#[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; pub mod fullnode; diff --git a/core/src/packet.rs b/core/src/packet.rs index aa69070c05..46d8b453f2 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -372,7 +372,11 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1; impl Blob { pub fn new(data: &[u8]) -> Self { let mut blob = Self::default(); + + assert!(data.len() <= blob.data.len()); + let data_len = cmp::min(data.len(), blob.data.len()); + let bytes = &data[..data_len]; blob.data[..data_len].copy_from_slice(bytes); blob.meta.size = blob.data_size() as usize; @@ -459,8 +463,8 @@ impl Blob { LittleEndian::read_u64(&self.data[SIZE_RANGE]) } - pub fn set_data_size(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[SIZE_RANGE], ix); + pub fn set_data_size(&mut self, size: u64) { + LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); } pub fn data(&self) -> &[u8] { diff --git a/core/src/result.rs b/core/src/result.rs index fba0bcefed..05eb4c84d5 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -2,8 +2,6 @@ use crate::blocktree; use crate::cluster_info; -#[cfg(feature = "erasure")] -use crate::erasure; use crate::packet; use crate::poh_recorder; use bincode; @@ -25,8 +23,7 @@ pub enum Error { TransactionError(transaction::TransactionError), ClusterInfoError(cluster_info::ClusterInfoError), BlobError(packet::BlobError), - #[cfg(feature = "erasure")] - ErasureError(erasure::ErasureError), + ErasureError(reed_solomon_erasure::Error), SendError, PohRecorderError(poh_recorder::PohRecorderError), BlocktreeError(blocktree::BlocktreeError), @@ -67,9 +64,8 @@ impl std::convert::From for Error { Error::ClusterInfoError(e) } } -#[cfg(feature = "erasure")] -impl std::convert::From for Error { - fn from(e: erasure::ErasureError) -> Error { +impl std::convert::From for Error { + fn from(e: reed_solomon_erasure::Error) -> Error { Error::ErasureError(e) } }