From 596f611ede4018b5ccfcff533b1464c3dcbef946 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 17 Apr 2019 18:04:30 -0700 Subject: [PATCH] Revert "revert-revert-erasure and erasure fixes (#3833)" (#3855) This reverts commit 6bef16a6a1d96bd7c6127cd30f78c2b2fdbf40d1. --- Cargo.lock | 13 - core/Cargo.toml | 1 - core/build.rs | 29 +- core/src/blocktree.rs | 462 ++++++++++++---------- core/src/blocktree/db.rs | 1 + core/src/blocktree/kvs.rs | 2 + core/src/blocktree/meta.rs | 141 ++----- core/src/blocktree/rocks.rs | 13 +- core/src/broadcast_stage.rs | 43 +- core/src/erasure.rs | 762 ++++++++++++++++++++---------------- core/src/lib.rs | 1 + core/src/packet.rs | 8 +- core/src/result.rs | 10 +- 13 files changed, 784 insertions(+), 702 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 255ed363c..ad0bba7b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1877,17 +1877,6 @@ dependencies = [ "redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "reed-solomon-erasure" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cc 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "regex" version = "1.1.2" @@ -2181,7 +2170,6 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", - "reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)", "ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3585,7 +3573,6 @@ dependencies = [ "checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828" -"checksum reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77cbbd4c02f53e345fe49e74255a1b10080731ffb2a03475e11df7fc8a043c37" "checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f" "checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" diff --git a/core/Cargo.toml b/core/Cargo.toml index d4102436c..ec49f960f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -40,7 +40,6 @@ nix = "0.13.0" rand = "0.6.5" rand_chacha = "0.1.1" rayon = "1.0.0" -reed-solomon-erasure = "3.1.1" reqwest = "0.9.11" ring = "0.13.2" rocksdb = "0.11.0" diff --git a/core/build.rs b/core/build.rs index 7c8cfebb3..4a3800377 100644 --- a/core/build.rs +++ b/core/build.rs @@ -24,8 +24,9 @@ fn main() { let chacha = !env::var("CARGO_FEATURE_CHACHA").is_err(); let cuda = !env::var("CARGO_FEATURE_CUDA").is_err(); + let erasure = !env::var("CARGO_FEATURE_ERASURE").is_err(); - if chacha || cuda { + if chacha || cuda || erasure { println!("cargo:rerun-if-changed={}", perf_libs_dir); println!("cargo:rustc-link-search=native={}", perf_libs_dir); } @@ -45,4 +46,30 @@ fn main() { println!("cargo:rustc-link-lib=dylib=cuda"); println!("cargo:rustc-link-lib=dylib=cudadevrt"); } + if erasure { + #[cfg(any(target_os = "macos", target_os = "ios"))] + { + println!( + "cargo:rerun-if-changed={}/libgf_complete.dylib", + perf_libs_dir + ); + println!("cargo:rerun-if-changed={}/libJerasure.dylib", perf_libs_dir); + } + #[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))] + { + println!("cargo:rerun-if-changed={}/libgf_complete.so", perf_libs_dir); + println!("cargo:rerun-if-changed={}/libJerasure.so", perf_libs_dir); + } + #[cfg(windows)] + { + println!( + "cargo:rerun-if-changed={}/libgf_complete.dll", + perf_libs_dir + ); + println!("cargo:rerun-if-changed={}/libJerasure.dll", perf_libs_dir); + } + + println!("cargo:rustc-link-lib=dylib=Jerasure"); + println!("cargo:rustc-link-lib=dylib=gf_complete"); + } } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 3f341e182..3fdfd39f2 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,6 +3,7 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; +#[cfg(feature = "erasure")] use crate::erasure; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; @@ -16,6 +17,7 @@ use hashbrown::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; +#[cfg(feature = "erasure")] use solana_metrics::counter::Counter; use solana_sdk::genesis_block::GenesisBlock; @@ -77,9 +79,9 @@ pub struct Blocktree { meta_cf: LedgerColumn, data_cf: LedgerColumn, erasure_cf: LedgerColumn, + #[cfg(feature = "erasure")] erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, - session: Arc, pub new_blobs_signals: Vec>, pub root_slot: RwLock, } @@ -90,6 +92,7 @@ pub const META_CF: &str = "meta"; pub const DATA_CF: &str = "data"; // Column family for erasure data pub const ERASURE_CF: &str = "erasure"; +#[cfg(feature = "erasure")] pub const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data pub const ORPHANS_CF: &str = "orphans"; @@ -113,7 +116,7 @@ impl Blocktree { // Create the erasure column family let erasure_cf = LedgerColumn::new(&db); - + #[cfg(feature = "erasure")] let erasure_meta_cf = LedgerColumn::new(&db); // Create the orphans column family. An "orphan" is defined as @@ -121,17 +124,14 @@ impl Blocktree { // known parent let orphans_cf = LedgerColumn::new(&db); - // setup erasure - let session = Arc::new(erasure::Session::default()); - Ok(Blocktree { db, meta_cf, data_cf, erasure_cf, + #[cfg(feature = "erasure")] erasure_meta_cf, orphans_cf, - session, new_blobs_signals: vec![], root_slot: RwLock::new(0), }) @@ -259,6 +259,7 @@ impl Blocktree { // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); + #[cfg(feature = "erasure")] let mut erasure_meta_working_set = HashMap::new(); let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let mut prev_inserted_blob_datas = HashMap::new(); @@ -300,17 +301,20 @@ impl Blocktree { continue; } - let set_index = ErasureMeta::set_index_for(blob.index()); - let erasure_meta_entry = erasure_meta_working_set - .entry((blob_slot, set_index)) - .or_insert_with(|| { - self.erasure_meta_cf - .get((blob_slot, set_index)) - .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::new(set_index)) - }); + #[cfg(feature = "erasure")] + { + let set_index = ErasureMeta::set_index_for(blob.index()); + let erasure_meta_entry = erasure_meta_working_set + .entry((blob_slot, set_index)) + .or_insert_with(|| { + self.erasure_meta_cf + .get((blob_slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index)) + }); - erasure_meta_entry.set_data_present(blob.index(), true); + erasure_meta_entry.set_data_present(blob.index()); + } let _ = self.insert_data_blob( blob, @@ -335,8 +339,11 @@ impl Blocktree { } } - for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { - write_batch.put::((*slot, *set_index), erasure_meta)?; + #[cfg(feature = "erasure")] + { + for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { + write_batch.put::((*slot, *set_index), erasure_meta)?; + } } self.db.write(write_batch)?; @@ -347,8 +354,36 @@ impl Blocktree { } } + #[cfg(feature = "erasure")] for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() { - self.try_erasure_recover(&erasure_meta, slot, set_index)?; + if erasure_meta.can_recover() { + match self.recover(slot, set_index) { + Ok(recovered) => { + inc_new_counter_info!("erasures-recovered", recovered); + } + Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => { + let mut erasure_meta = self + .erasure_meta_cf + .get((slot, set_index))? + .expect("erasure meta should exist"); + + let mut batch = self.db.batch()?; + + let start_index = erasure_meta.start_index(); + let (_, coding_end_idx) = erasure_meta.end_indexes(); + + erasure_meta.coding = 0; + batch.put::((slot, set_index), &erasure_meta)?; + + for idx in start_index..coding_end_idx { + batch.delete::((slot, idx))?; + } + + self.db.write(batch)?; + } + Err(e) => return Err(e), + } + } } Ok(()) @@ -418,42 +453,26 @@ impl Blocktree { pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.erasure_cf.get_bytes((slot, index)) } - pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { - let set_index = ErasureMeta::set_index_for(index); - - let mut erasure_meta = self - .erasure_meta_cf - .get((slot, set_index))? - .unwrap_or_else(|| ErasureMeta::new(set_index)); - - erasure_meta.set_coding_present(index, false); - - let mut batch = self.db.batch()?; - - batch.delete::((slot, index))?; - batch.put::((slot, set_index), &erasure_meta)?; - - self.db.write(batch)?; - Ok(()) + self.erasure_cf.delete((slot, index)) } - pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.data_cf.get_bytes((slot, index)) } - /// For benchmarks, testing, and setup. - /// Does no metadata tracking. Use with care. - pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.data_cf.put_bytes((slot, index), bytes) - } - pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { self.erasure_cf.put_bytes((slot, index), bytes) } + #[cfg(not(feature = "erasure"))] + #[inline] + pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.put_coding_blob_bytes_raw(slot, index, bytes) + } + /// this function will insert coding blobs and also automatically track erasure-related /// metadata. If recovery is available it will be done + #[cfg(feature = "erasure")] pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { let set_index = ErasureMeta::set_index_for(index); let mut erasure_meta = self @@ -461,7 +480,7 @@ impl Blocktree { .get((slot, set_index))? .unwrap_or_else(|| ErasureMeta::new(set_index)); - erasure_meta.set_coding_present(index, true); + erasure_meta.set_coding_present(index); let mut writebatch = self.db.batch()?; @@ -471,26 +490,41 @@ impl Blocktree { self.db.write(writebatch)?; - self.try_erasure_recover(&erasure_meta, slot, set_index) + if erasure_meta.can_recover() { + match self.recover(slot, set_index) { + Ok(recovered) => { + inc_new_counter_info!("erasures-recovered", recovered); + return Ok(()); + } + Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => { + let start_index = erasure_meta.start_index(); + let (_, coding_end_idx) = erasure_meta.end_indexes(); + let mut batch = self.db.batch()?; + + erasure_meta.coding = 0; + batch.put::((slot, set_index), &erasure_meta)?; + + for idx in start_index..coding_end_idx { + batch.delete::((slot, idx as u64))?; + } + + self.db.write(batch)?; + + return Ok(()); + } + Err(e) => return Err(e), + } + } + + Ok(()) } - fn try_erasure_recover( - &self, - erasure_meta: &ErasureMeta, - slot: u64, - set_index: u64, - ) -> Result<()> { - match erasure_meta.status() { - ErasureMetaStatus::CanRecover => { - let recovered = self.recover(slot, set_index)?; - inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered); - } - ErasureMetaStatus::StillNeed(needed) => { - inc_new_counter_info!("blocktree-erasure-blobs_needed", needed) - } - ErasureMetaStatus::DataFull => inc_new_counter_info!("blocktree-erasure-complete", 1), - } - Ok(()) + pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> { + self.data_cf.put_bytes((slot, index), value) + } + + pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.data_cf.put_bytes((slot, index), bytes) } pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result> { @@ -592,6 +626,20 @@ impl Blocktree { } } + pub fn find_missing_coding_indexes( + &self, + slot: u64, + start_index: u64, + end_index: u64, + max_missing: usize, + ) -> Vec { + if let Ok(mut db_iterator) = self.erasure_cf.cursor() { + Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) + } else { + vec![] + } + } + /// Returns the entry vector for the slot starting with `blob_start_index` pub fn get_slot_entries( &self, @@ -1040,45 +1088,43 @@ impl Blocktree { Ok(()) } + #[cfg(feature = "erasure")] /// Attempts recovery using erasure coding fn recover(&self, slot: u64, set_index: u64) -> Result { - use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA}; + use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA}; + use crate::packet::BLOB_DATA_SIZE; let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap(); let start_idx = erasure_meta.start_index(); let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); - let present = &mut [true; ERASURE_SET_SIZE]; - let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); + let mut erasures = Vec::with_capacity(NUM_CODING + 1); + let (mut data, mut coding) = (vec![], vec![]); let mut size = 0; for i in start_idx..coding_end_idx { if erasure_meta.is_coding_present(i) { - let mut blob_bytes = self + let blob_bytes = self .erasure_cf .get_bytes((slot, i))? .expect("erasure_meta must have no false positives"); - blob_bytes.drain(..BLOB_HEADER_SIZE); - if size == 0 { - size = blob_bytes.len(); + size = blob_bytes.len() - BLOB_HEADER_SIZE; } - blobs.push(blob_bytes); + coding.push(blob_bytes); } else { - let set_relative_idx = (i - start_idx) as usize + NUM_DATA; - blobs.push(vec![0; size]); - present[set_relative_idx] = false; + let set_relative_idx = (i - start_idx) + NUM_DATA as u64; + coding.push(vec![0; crate::packet::BLOB_SIZE]); + erasures.push(set_relative_idx as i32); } } assert_ne!(size, 0); for i in start_idx..data_end_idx { - let set_relative_idx = (i - start_idx) as usize; - if erasure_meta.is_data_present(i) { let mut blob_bytes = self .data_cf @@ -1086,28 +1132,90 @@ impl Blocktree { .expect("erasure_meta must have no false positives"); // If data is too short, extend it with zeroes - blob_bytes.resize(size, 0u8); + if blob_bytes.len() < size { + blob_bytes.resize(size, 0u8); + } - blobs.insert(set_relative_idx, blob_bytes); + data.push(blob_bytes); } else { - blobs.insert(set_relative_idx, vec![0u8; size]); + let set_relative_index = i - start_idx; + data.push(vec![0; size]); // data erasures must come before any coding erasures if present - present[set_relative_idx] = false; + erasures.insert(0, set_relative_index as i32); } } - let (recovered_data, recovered_coding) = self - .session - .reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; + let mut coding_ptrs: Vec<_> = coding + .iter_mut() + .map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size]) + .collect(); - let amount_recovered = recovered_data.len() + recovered_coding.len(); + let mut data_ptrs: Vec<_> = data + .iter_mut() + .map(|data_bytes| &mut data_bytes[..size]) + .collect(); - trace!( - "[recover] reconstruction OK slot: {}, indexes: [{},{})", - slot, - start_idx, - data_end_idx - ); + // Marks the end + erasures.push(-1); + trace!("erasures: {:?}, size: {}", erasures, size); + + erasure::decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + + // Create the missing blobs from the reconstructed data + let block_start_idx = erasure_meta.start_index(); + let (mut recovered_data, mut recovered_coding) = (vec![], vec![]); + + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + + let (data_size, idx, first_byte); + + if n < NUM_DATA { + let mut blob = Blob::new(&data_ptrs[n]); + + idx = n as u64 + block_start_idx; + data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; + first_byte = blob.data[0]; + + if data_size > BLOB_DATA_SIZE { + error!("corrupt data blob[{}] data_size: {}", idx, data_size); + return Err(Error::ErasureError(ErasureError::CorruptCoding)); + } + + blob.set_slot(slot); + blob.set_index(idx); + blob.set_size(data_size); + recovered_data.push(blob); + } else { + let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]); + + idx = (n - NUM_DATA) as u64 + block_start_idx; + data_size = size; + first_byte = blob.data[0]; + + if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); + return Err(Error::ErasureError(ErasureError::CorruptCoding)); + } + + blob.set_slot(slot); + blob.set_index(idx); + blob.set_data_size(data_size as u64); + recovered_coding.push(blob); + } + + trace!( + "erasures[{}] ({}) size: {} data[0]: {}", + *i, + idx, + data_size, + first_byte, + ); + } self.write_blobs(recovered_data)?; @@ -1115,7 +1223,7 @@ impl Blocktree { self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?; } - Ok(amount_recovered) + Ok(erasures.len() - 1) } /// Returns the next consumed index and the number of ticks in the new consumed @@ -1713,47 +1821,44 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive"); { let blocktree = Blocktree::open(&blocktree_path).unwrap(); - for i in 0..4 { - let slot = i; - let parent_slot = if i == 0 { 0 } else { i - 1 }; - // Write entries - let num_entries = 21 as u64 * (i + 1); - let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries); + let slot = 0; + let parent_slot = 0; + // Write entries + let num_entries = 21 as u64; + let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries); - blocktree - .write_blobs(blobs.iter().skip(1).step_by(2)) - .unwrap(); + blocktree + .write_blobs(blobs.iter().skip(1).step_by(2)) + .unwrap(); - assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); + assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); - let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); - if num_entries % 2 == 0 { - assert_eq!(meta.received, num_entries); - } else { - debug!("got here"); - assert_eq!(meta.received, num_entries - 1); - } - assert_eq!(meta.consumed, 0); - assert_eq!(meta.parent_slot, parent_slot); - if num_entries % 2 == 0 { - assert_eq!(meta.last_index, num_entries - 1); - } else { - assert_eq!(meta.last_index, std::u64::MAX); - } - - blocktree.write_blobs(blobs.iter().step_by(2)).unwrap(); - - assert_eq!( - blocktree.get_slot_entries(slot, 0, None).unwrap(), - original_entries, - ); - - let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); + let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); + if num_entries % 2 == 0 { assert_eq!(meta.received, num_entries); - assert_eq!(meta.consumed, num_entries); - assert_eq!(meta.parent_slot, parent_slot); - assert_eq!(meta.last_index, num_entries - 1); + } else { + assert_eq!(meta.received, num_entries - 1); } + assert_eq!(meta.consumed, 0); + assert_eq!(meta.parent_slot, 0); + if num_entries % 2 == 0 { + assert_eq!(meta.last_index, num_entries - 1); + } else { + assert_eq!(meta.last_index, std::u64::MAX); + } + + blocktree.write_blobs(blobs.iter().step_by(2)).unwrap(); + + assert_eq!( + blocktree.get_slot_entries(0, 0, None).unwrap(), + original_entries, + ); + + let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); + assert_eq!(meta.received, num_entries); + assert_eq!(meta.consumed, num_entries); + assert_eq!(meta.parent_slot, 0); + assert_eq!(meta.last_index, num_entries - 1); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -2560,6 +2665,7 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[cfg(feature = "erasure")] mod erasure { use super::*; use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; @@ -2624,7 +2730,7 @@ pub mod tests { assert_eq!(erasure_meta.data, 0x00FF); assert_eq!(erasure_meta.coding, 0x0); - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let mut coding_generator = CodingGenerator::new(); let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); for shared_coding_blob in coding_blobs { @@ -2643,23 +2749,6 @@ pub mod tests { assert_eq!(erasure_meta.data, 0xFFFF); assert_eq!(erasure_meta.coding, 0x0F); - - let (start_idx, coding_end_idx) = - (erasure_meta.start_index(), erasure_meta.end_indexes().1); - - for idx in start_idx..coding_end_idx { - blocktree.delete_coding_blob(slot, idx).unwrap(); - } - - let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 0)) - .expect("DB get must succeed") - .unwrap(); - - assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); - assert_eq!(erasure_meta.data, 0xFFFF); - assert_eq!(erasure_meta.coding, 0x0); } #[test] @@ -2677,12 +2766,11 @@ pub mod tests { .map(Blob::into) .collect::>(); - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let mut coding_generator = CodingGenerator::new(); for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { let focused_index = (set_index + 1) * NUM_DATA - 1; let coding_blobs = coding_generator.next(&data_blobs); - assert_eq!(coding_blobs.len(), NUM_CODING); let deleted_data = data_blobs[NUM_DATA - 1].clone(); @@ -2733,12 +2821,13 @@ pub mod tests { Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction"); } + /// FIXME: JERASURE Threading: see Issue + /// [#3725](https://github.com/solana-labs/solana/issues/3725) #[test] fn test_recovery_multi_slot_multi_thread() { - use rand::rngs::SmallRng; - use rand::SeedableRng; use std::thread; + const USE_THREADS: bool = true; let slots = vec![0, 3, 5, 50, 100]; let max_erasure_sets = 16; solana_logger::setup(); @@ -2748,7 +2837,7 @@ pub mod tests { // Specification should generate a ledger where each slot has an random number of // erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones - // will have between 1 data blob missing and 1 coding blob + // will have between 1-4 data blobs missing and all coding blobs let specs = slots .iter() .map(|&slot| { @@ -2759,7 +2848,7 @@ pub mod tests { let (num_data, num_coding) = if set_index % 2 == 0 { (NUM_DATA - rng.gen_range(1, 5), NUM_CODING) } else { - (NUM_DATA - 1, NUM_CODING - 1) + (NUM_DATA, 0) }; ErasureSpec { set_index, @@ -2784,60 +2873,35 @@ pub mod tests { for slot_model in model.clone() { let blocktree = Arc::clone(&blocktree); let slot = slot_model.slot; - let mut rng = SmallRng::from_rng(&mut rng).unwrap(); - let handle = thread::spawn(move || { + let closure = move || { for erasure_set in slot_model.chunks { - // for even sets, write data blobs first, then write coding blobs, which - // should trigger recovery since all coding blobs will be inserted and - // between 1-4 data blobs are missing - if rng.gen() { - blocktree - .write_shared_blobs(erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - - for shared_coding_blob in erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - } else { - // for odd sets, write coding blobs first, then write the data blobs. - // writing the data blobs should trigger recovery, since 3/4 coding and - // 15/16 data blobs will be present - for shared_coding_blob in erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); + blocktree + .write_shared_blobs(erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + for shared_coding_blob in erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; blocktree - .write_shared_blobs(erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); + .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) + .expect("Writing coding blobs must succeed"); } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); } - }); + }; - handles.push(handle); + if USE_THREADS { + handles.push(thread::spawn(closure)); + } else { + closure(); + } } handles @@ -2862,7 +2926,7 @@ pub mod tests { ); // all possibility for recovery should be exhausted - assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); + assert!(!erasure_meta.can_recover()); // Should have all data assert_eq!(erasure_meta.data, 0xFFFF); if set_index % 2 == 0 { diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 365673792..4bdb8ad2b 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -28,6 +28,7 @@ pub mod columns { /// Data Column pub struct Data; + #[cfg(feature = "erasure")] #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index a9eee4137..d5cc0784e 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -138,6 +138,7 @@ impl TypedColumn for cf::SlotMeta { type Type = super::SlotMeta; } +#[cfg(feature = "erasure")] impl Column for cf::ErasureMeta { const NAME: &'static str = super::ERASURE_META_CF; type Index = (u64, u64); @@ -156,6 +157,7 @@ impl Column for cf::ErasureMeta { } } +#[cfg(feature = "erasure")] impl TypedColumn for cf::ErasureMeta { type Type = super::ErasureMeta; } diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index e90f5077e..824d468c9 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "erasure")] use crate::erasure::{NUM_CODING, NUM_DATA}; #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] @@ -58,6 +59,7 @@ impl SlotMeta { } } +#[cfg(feature = "erasure")] #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] /// Erasure coding information pub struct ErasureMeta { @@ -69,13 +71,7 @@ pub struct ErasureMeta { pub coding: u64, } -#[derive(Debug, PartialEq)] -pub enum ErasureMetaStatus { - CanRecover, - DataFull, - StillNeed(usize), -} - +#[cfg(feature = "erasure")] impl ErasureMeta { pub fn new(set_index: u64) -> ErasureMeta { ErasureMeta { @@ -85,71 +81,46 @@ impl ErasureMeta { } } - pub fn status(&self) -> ErasureMetaStatus { + pub fn can_recover(&self) -> bool { let (data_missing, coding_missing) = ( NUM_DATA - self.data.count_ones() as usize, NUM_CODING - self.coding.count_ones() as usize, ); - if data_missing > 0 && data_missing + coding_missing <= NUM_CODING { - ErasureMetaStatus::CanRecover - } else if data_missing == 0 { - ErasureMetaStatus::DataFull - } else { - ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING) - } + + data_missing > 0 && data_missing + coding_missing <= NUM_CODING } pub fn is_coding_present(&self, index: u64) -> bool { - let start = self.start_index(); - let end = start + NUM_CODING as u64; + let set_index = Self::set_index_for(index); + let position = index - self.start_index(); - if start <= index && index < end { - let position = index - start; - - self.coding & (1 << position) != 0 - } else { - false - } + set_index == self.set_index && self.coding & (1 << position) != 0 } - pub fn set_coding_present(&mut self, index: u64, present: bool) { + pub fn set_coding_present(&mut self, index: u64) { let set_index = Self::set_index_for(index); if set_index as u64 == self.set_index { let position = index - self.start_index(); - if present { - self.coding |= 1 << position; - } else { - self.coding &= !(1 << position); - } + self.coding |= 1 << position; } } pub fn is_data_present(&self, index: u64) -> bool { - let start = self.start_index(); - let end = start + NUM_DATA as u64; + let set_index = Self::set_index_for(index); + let position = index - self.start_index(); - if start <= index && index < end { - let position = index - start; - - self.data & (1 << position) != 0 - } else { - false - } + set_index == self.set_index && self.data & (1 << position) != 0 } - pub fn set_data_present(&mut self, index: u64, present: bool) { + pub fn set_data_present(&mut self, index: u64) { let set_index = Self::set_index_for(index); if set_index as u64 == self.set_index { let position = index - self.start_index(); - if present { - self.data |= 1 << position; - } else { - self.data &= !(1 << position); - } + self.data |= 1 << position; } } @@ -168,29 +139,7 @@ impl ErasureMeta { } } -#[test] -fn test_meta_indexes() { - use rand::{thread_rng, Rng}; - - let mut rng = thread_rng(); - - for _ in 0..100 { - let set_index = rng.gen_range(0, 1_000); - let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16); - - assert_eq!(set_index, ErasureMeta::set_index_for(blob_index)); - let e_meta = ErasureMeta::new(set_index); - - assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64); - let (data_end_idx, coding_end_idx) = e_meta.end_indexes(); - assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64); - assert_eq!( - coding_end_idx, - set_index * NUM_DATA as u64 + NUM_CODING as u64 - ); - } -} - +#[cfg(feature = "erasure")] #[test] fn test_meta_coding_present() { let set_index = 0; @@ -201,7 +150,7 @@ fn test_meta_coding_present() { }; for i in 0..NUM_CODING as u64 { - e_meta.set_coding_present(i, true); + e_meta.set_coding_present(i); assert_eq!(e_meta.is_coding_present(i), true); } for i in NUM_CODING as u64..NUM_DATA as u64 { @@ -211,7 +160,7 @@ fn test_meta_coding_present() { e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64); for i in (NUM_DATA * 17) as u64..((NUM_DATA * 17) + NUM_CODING) as u64 { - e_meta.set_coding_present(i, true); + e_meta.set_coding_present(i); assert_eq!(e_meta.is_coding_present(i), true); } for i in (NUM_DATA * 17 + NUM_CODING) as u64..((NUM_DATA * 17) + NUM_DATA) as u64 { @@ -219,8 +168,9 @@ fn test_meta_coding_present() { } } +#[cfg(feature = "erasure")] #[test] -fn test_erasure_meta_status() { +fn test_can_recover() { let set_index = 0; let mut e_meta = ErasureMeta { set_index, @@ -228,63 +178,36 @@ fn test_erasure_meta_status() { coding: 0, }; - assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA)); + assert!(!e_meta.can_recover()); e_meta.data = 0b1111_1111_1111_1111; e_meta.coding = 0x00; - assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + assert!(!e_meta.can_recover()); e_meta.coding = 0x0e; - assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + assert_eq!(0x0fu8, 0b0000_1111u8); + assert!(!e_meta.can_recover()); e_meta.data = 0b0111_1111_1111_1111; - assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); + assert!(e_meta.can_recover()); e_meta.data = 0b0111_1111_1111_1110; - assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); + assert!(e_meta.can_recover()); e_meta.data = 0b0111_1111_1011_1110; - assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); + assert!(e_meta.can_recover()); e_meta.data = 0b0111_1011_1011_1110; - assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1)); + assert!(!e_meta.can_recover()); e_meta.data = 0b0111_1011_1011_1110; - assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1)); + assert!(!e_meta.can_recover()); e_meta.coding = 0b0000_1110; e_meta.data = 0b1111_1111_1111_1100; - assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); + assert!(e_meta.can_recover()); e_meta.data = 0b1111_1111_1111_1000; - assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); -} - -#[test] -fn test_meta_data_present() { - let set_index = 0; - let mut e_meta = ErasureMeta { - set_index, - data: 0, - coding: 0, - }; - - for i in 0..NUM_DATA as u64 { - e_meta.set_data_present(i, true); - assert_eq!(e_meta.is_data_present(i), true); - } - for i in NUM_DATA as u64..2 * NUM_DATA as u64 { - assert_eq!(e_meta.is_data_present(i), false); - } - - e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64); - - for i in (NUM_DATA * 23) as u64..(NUM_DATA * 24) as u64 { - e_meta.set_data_present(i, true); - assert_eq!(e_meta.is_data_present(i), true); - } - for i in (NUM_DATA * 22) as u64..(NUM_DATA * 23) as u64 { - assert_eq!(e_meta.is_data_present(i), false); - } + assert!(e_meta.can_recover()); } diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 4d881d935..0f3bdaf52 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,7 +30,9 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { - use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; + #[cfg(feature = "erasure")] + use crate::blocktree::db::columns::ErasureMeta; + use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta}; fs::create_dir_all(&path)?; @@ -41,6 +43,7 @@ impl Backend for Rocks { let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); + #[cfg(feature = "erasure")] let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); @@ -49,6 +52,7 @@ impl Backend for Rocks { meta_cf_descriptor, data_cf_descriptor, erasure_cf_descriptor, + #[cfg(feature = "erasure")] erasure_meta_cf_descriptor, orphans_cf_descriptor, ]; @@ -60,10 +64,13 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { - use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; + #[cfg(feature = "erasure")] + use crate::blocktree::db::columns::ErasureMeta; + use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta}; vec![ Coding::NAME, + #[cfg(feature = "erasure")] ErasureMeta::NAME, Data::NAME, Orphans::NAME, @@ -189,6 +196,7 @@ impl TypedColumn for cf::SlotMeta { type Type = super::SlotMeta; } +#[cfg(feature = "erasure")] impl Column for cf::ErasureMeta { const NAME: &'static str = super::ERASURE_META_CF; type Index = (u64, u64); @@ -208,6 +216,7 @@ impl Column for cf::ErasureMeta { } } +#[cfg(feature = "erasure")] impl TypedColumn for cf::ErasureMeta { type Type = super::ErasureMeta; } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 32fe2ef40..c23227dc1 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -3,6 +3,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; use crate::entry::{EntrySender, EntrySlice}; +#[cfg(feature = "erasure")] use crate::erasure::CodingGenerator; use crate::packet::index_blobs; use crate::poh_recorder::WorkingBankEntries; @@ -28,6 +29,8 @@ pub enum BroadcastStageReturnType { struct Broadcast { id: Pubkey, + + #[cfg(feature = "erasure")] coding_generator: CodingGenerator, } @@ -116,6 +119,7 @@ impl Broadcast { blocktree.write_shared_blobs(&blobs)?; + #[cfg(feature = "erasure")] let coding = self.coding_generator.next(&blobs); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); @@ -125,10 +129,14 @@ impl Broadcast { // Send out data ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; + #[cfg(feature = "erasure")] + ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - // send out erasures - ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; + // generate and transmit any erasure coding blobs. if erasure isn't supported, just send everything again + #[cfg(not(feature = "erasure"))] + ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); @@ -186,11 +194,11 @@ impl BroadcastStage { storage_entry_sender: EntrySender, ) -> BroadcastStageReturnType { let me = cluster_info.read().unwrap().my_data().clone(); - let coding_generator = CodingGenerator::default(); let mut broadcast = Broadcast { id: me.id, - coding_generator, + #[cfg(feature = "erasure")] + coding_generator: CodingGenerator::new(), }; loop { @@ -276,9 +284,9 @@ mod test { use crate::entry::create_ticks; use crate::service::Service; use solana_runtime::bank::Bank; - use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -313,9 +321,7 @@ mod test { let exit_sender = Arc::new(AtomicBool::new(false)); let (storage_sender, _receiver) = channel(); - - let (genesis_block, _) = GenesisBlock::new(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); + let bank = Arc::new(Bank::default()); // Start up the broadcast stage let broadcast_service = BroadcastStage::new( @@ -335,13 +341,15 @@ mod test { } #[test] + #[ignore] + //TODO this test won't work since broadcast stage no longer edits the ledger fn test_broadcast_ledger() { - solana_logger::setup(); let ledger_path = get_tmp_ledger_path("test_broadcast_ledger"); - { // Create the leader scheduler let leader_keypair = Keypair::new(); + let start_tick_height = 0; + let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT; let (entry_sender, entry_receiver) = channel(); let broadcast_service = setup_dummy_broadcast_service( @@ -350,9 +358,6 @@ mod test { entry_receiver, ); let bank = broadcast_service.bank.clone(); - let start_tick_height = bank.tick_height(); - let max_tick_height = bank.max_tick_height(); - let ticks_per_slot = bank.ticks_per_slot(); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); for (i, tick) in ticks.into_iter().enumerate() { @@ -362,23 +367,15 @@ mod test { } sleep(Duration::from_millis(2000)); - - trace!( - "[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}", - max_tick_height, - start_tick_height, - ticks_per_slot, - ); - let blocktree = broadcast_service.blocktree; let mut blob_index = 0; for i in 0..max_tick_height - start_tick_height { - let slot = (start_tick_height + i + 1) / ticks_per_slot; + let slot = (start_tick_height + i + 1) / DEFAULT_TICKS_PER_SLOT; let result = blocktree.get_data_blob(slot, blob_index).unwrap(); blob_index += 1; - result.expect("expect blob presence"); + assert!(result.is_some()); } drop(entry_sender); diff --git a/core/src/erasure.rs b/core/src/erasure.rs index d79889306..b7fdffc2b 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -1,217 +1,278 @@ -//! # Erasure Coding and Recovery -//! -//! Blobs are logically grouped into erasure sets or blocks. Each set contains 16 sequential data -//! blobs and 4 sequential coding blobs. -//! -//! Coding blobs in each set starting from `start_idx`: -//! For each erasure set: -//! generate `NUM_CODING` coding_blobs. -//! index the coding blobs from `start_idx` to `start_idx + NUM_CODING - 1`. -//! -//! model of an erasure set, with top row being data blobs and second being coding -//! |<======================= NUM_DATA ==============================>| -//! |<==== NUM_CODING ===>| -//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ -//! | D | | D | | D | | D | | D | | D | | D | | D | | D | | D | -//! +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ -//! | C | | C | | C | | C | | | | | | | | | | | | | -//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ -//! -//! blob structure for coding blobs -//! -//! + ------- meta is set and used by transport, meta.size is actual length -//! | of data in the byte array blob.data -//! | -//! | + -- data is stuff shipped over the wire, and has an included -//! | | header -//! V V -//! +----------+------------------------------------------------------------+ -//! | meta | data | -//! |+---+-- |+---+---+---+---+------------------------------------------+| -//! || s | . || i | | f | s | || -//! || i | . || n | i | l | i | || -//! || z | . || d | d | a | z | blob.data(), or blob.data_mut() || -//! || e | || e | | g | e | || -//! |+---+-- || x | | s | | || -//! | |+---+---+---+---+------------------------------------------+| -//! +----------+------------------------------------------------------------+ -//! | |<=== coding blob part for "coding" =======>| -//! | | -//! |<============== data blob part for "coding" ==============>| -//! -//! -use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; -use crate::result::Result; +// Support erasure coding +use crate::packet::{Blob, SharedBlob}; +use crate::result::{Error, Result}; use std::cmp; -use std::convert::AsMut; use std::sync::{Arc, RwLock}; -use reed_solomon_erasure::ReedSolomon; - //TODO(sakridge) pick these values -/// Number of data blobs -pub const NUM_DATA: usize = 16; -/// Number of coding blobs; also the maximum number that can go missing. -pub const NUM_CODING: usize = 4; -/// Total number of blobs in an erasure set; includes data and coding blobs -pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; +pub const NUM_DATA: usize = 16; // number of data blobs +pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing +pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs -/// Represents an erasure "session" with a particular configuration and number of data and coding -/// blobs -#[derive(Debug, Clone)] -pub struct Session(ReedSolomon); - -/// Generates coding blobs on demand given data blobs -#[derive(Debug, Clone)] -pub struct CodingGenerator { - /// SharedBlobs that couldn't be used in last call to next() - leftover: Vec, - session: Arc, +macro_rules! align { + ($x:expr, $align:expr) => { + $x + ($align - 1) & !($align - 1) + }; } -impl Session { - pub fn new(data_count: usize, coding_count: usize) -> Result { - let rs = ReedSolomon::new(data_count, coding_count)?; +#[derive(Debug, PartialEq, Eq)] +pub enum ErasureError { + NotEnoughBlocksToDecode, + DecodeError, + EncodeError, + InvalidBlockSize, + InvalidBlobData, + CorruptCoding, +} - Ok(Session(rs)) +// k = number of data devices +// m = number of coding devices +// w = word size + +extern "C" { + fn jerasure_matrix_encode( + k: i32, + m: i32, + w: i32, + matrix: *const i32, + data_ptrs: *const *const u8, + coding_ptrs: *const *mut u8, + size: i32, + ); + fn jerasure_matrix_decode( + k: i32, + m: i32, + w: i32, + matrix: *const i32, + row_k_ones: i32, + erasures: *const i32, + data_ptrs: *const *mut u8, + coding_ptrs: *const *mut u8, + size: i32, + ) -> i32; + fn galois_single_divide(a: i32, b: i32, w: i32) -> i32; + fn galois_init_default_field(w: i32) -> i32; +} + +use std::sync::Once; +static ERASURE_W_ONCE: Once = Once::new(); + +// jerasure word size of 32 +fn w() -> i32 { + let w = 32; + unsafe { + ERASURE_W_ONCE.call_once(|| { + galois_init_default_field(w); + () + }); } + w +} - /// Create coding blocks by overwriting `parity` - pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> { - self.0.encode_sep(data, parity)?; +// jerasure checks that arrays are a multiple of w()/8 in length +fn wb() -> usize { + (w() / 8) as usize +} - Ok(()) - } - - /// Recover data + coding blocks into data blocks - /// # Arguments - /// * `data` - array of data blocks to recover into - /// * `coding` - array of coding blocks - /// * `erasures` - list of indices in data where blocks should be recovered - pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> { - self.0.reconstruct(blocks, present)?; - - Ok(()) - } - - /// Returns `(number_of_data_blobs, number_of_coding_blobs)` - pub fn dimensions(&self) -> (usize, usize) { - (self.0.data_shard_count(), self.0.parity_shard_count()) - } - - /// Reconstruct any missing blobs in this erasure set if possible - /// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata - /// Assumes that the user has sliced into the blobs appropriately already. else recovery will - /// return an error or garbage data - pub fn reconstruct_blobs( - &self, - blobs: &mut [B], - present: &[bool], - size: usize, - block_start_idx: u64, - slot: u64, - ) -> Result<(Vec, Vec)> - where - B: AsMut<[u8]>, - { - let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect(); - - trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,); - - // Decode the blocks - self.decode_blocks(blocks.as_mut_slice(), &present)?; - - let mut recovered_data = vec![]; - let mut recovered_coding = vec![]; - - let erasures = present - .iter() - .enumerate() - .filter_map(|(i, present)| if *present { None } else { Some(i) }); - - // Create the missing blobs from the reconstructed data - for n in erasures { - let data_size; - let idx; - let first_byte; - - if n < NUM_DATA { - let mut blob = Blob::new(&blocks[n]); - - data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; - idx = n as u64 + block_start_idx; - first_byte = blob.data[0]; - - blob.set_size(data_size); - recovered_data.push(blob); - } else { - let mut blob = Blob::default(); - blob.data_mut()[..size].copy_from_slice(&blocks[n]); - data_size = size; - idx = (n as u64 + block_start_idx) - NUM_DATA as u64; - first_byte = blob.data[0]; - - blob.set_slot(slot); - blob.set_index(idx); - blob.set_size(data_size); - recovered_coding.push(blob); +fn get_matrix(m: i32, k: i32, w: i32) -> Vec { + let mut matrix = vec![0; (m * k) as usize]; + for i in 0..m { + for j in 0..k { + unsafe { + matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w); } - - trace!( - "[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}", - n, - idx, - data_size, - first_byte - ); } + } + matrix +} - Ok((recovered_data, recovered_coding)) +// Generate coding blocks into coding +// There are some alignment restrictions, blocks should be aligned by 16 bytes +// which means their size should be >= 16 bytes +fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> { + if data.is_empty() { + return Ok(()); + } + let k = data.len() as i32; + let m = coding.len() as i32; + let block_len = data[0].len() as i32; + let matrix: Vec = get_matrix(m, k, w()); + let mut data_arg = Vec::with_capacity(data.len()); + for block in data { + if block_len != block.len() as i32 { + error!( + "data block size incorrect {} expected {}", + block.len(), + block_len + ); + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); + } + data_arg.push(block.as_ptr()); + } + let mut coding_arg = Vec::with_capacity(coding.len()); + for block in coding { + if block_len != block.len() as i32 { + error!( + "coding block size incorrect {} expected {}", + block.len(), + block_len + ); + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); + } + coding_arg.push(block.as_mut_ptr()); + } + + unsafe { + jerasure_matrix_encode( + k, + m, + w(), + matrix.as_ptr(), + data_arg.as_ptr(), + coding_arg.as_ptr(), + block_len, + ); + } + Ok(()) +} + +// Recover data + coding blocks into data blocks +// data: array of blocks to recover into +// coding: arry of coding blocks +// erasures: list of indices in data where blocks should be recovered +pub fn decode_blocks( + data: &mut [&mut [u8]], + coding: &mut [&mut [u8]], + erasures: &[i32], +) -> Result<()> { + if data.is_empty() { + return Ok(()); + } + let block_len = data[0].len(); + let matrix: Vec = get_matrix(coding.len() as i32, data.len() as i32, w()); + + // generate coding pointers, blocks should be the same size + let mut coding_arg: Vec<*mut u8> = Vec::new(); + for x in coding.iter_mut() { + if x.len() != block_len { + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); + } + coding_arg.push(x.as_mut_ptr()); + } + + // generate data pointers, blocks should be the same size + let mut data_arg: Vec<*mut u8> = Vec::new(); + for x in data.iter_mut() { + if x.len() != block_len { + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); + } + data_arg.push(x.as_mut_ptr()); + } + let ret = unsafe { + jerasure_matrix_decode( + data.len() as i32, + coding.len() as i32, + w(), + matrix.as_ptr(), + 0, + erasures.as_ptr(), + data_arg.as_ptr(), + coding_arg.as_ptr(), + data[0].len() as i32, + ) + }; + trace!("jerasure_matrix_decode ret: {}", ret); + for x in data[erasures[0] as usize][0..8].iter() { + trace!("{} ", x) + } + trace!(""); + if ret < 0 { + return Err(Error::ErasureError(ErasureError::DecodeError)); + } + Ok(()) +} + +// Generate coding blocks in window starting from start_idx, +// for num_blobs.. For each block place the coding blobs +// at the start of the block like so: +// +// model of an erasure set, with top row being data blobs and second being coding +// |<======================= NUM_DATA ==============================>| +// |<==== NUM_CODING ===>| +// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +// | D | | D | | D | | D | | D | | D | | D | | D | | D | | D | +// +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ +// | C | | C | | C | | C | | | | | | | | | | | | | +// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +// +// blob structure for coding, recover +// +// + ------- meta is set and used by transport, meta.size is actual length +// | of data in the byte array blob.data +// | +// | + -- data is stuff shipped over the wire, and has an included +// | | header +// V V +// +----------+------------------------------------------------------------+ +// | meta | data | +// |+---+-- |+---+---+---+---+------------------------------------------+| +// || s | . || i | | f | s | || +// || i | . || n | i | l | i | || +// || z | . || d | d | a | z | blob.data(), or blob.data_mut() || +// || e | || e | | g | e | || +// |+---+-- || x | | s | | || +// | |+---+---+---+---+------------------------------------------+| +// +----------+------------------------------------------------------------+ +// | |<=== coding blob part for "coding" =======>| +// | | +// |<============== data blob part for "coding" ==============>| +// +// +// +pub struct CodingGenerator { + leftover: Vec, // SharedBlobs that couldn't be used in last call to next() +} + +impl Default for CodingGenerator { + fn default() -> Self { + CodingGenerator { + leftover: Vec::with_capacity(NUM_DATA), + } } } impl CodingGenerator { - pub fn new(session: Arc) -> Self { - CodingGenerator { - leftover: Vec::with_capacity(session.0.data_shard_count()), - session, - } + pub fn new() -> Self { + Self::default() } - /// Yields next set of coding blobs, if any. - /// Must be called with consecutive data blobs within a slot. - /// - /// Passing in a slice with the first blob having a new slot will cause internal state to - /// reset, so the above concern does not apply to slot boundaries, only indexes within a slot - /// must be consecutive. - /// - /// If used improperly, it my return garbage coding blobs, but will not give an - /// error. + // must be called with consecutive data blobs from previous invocation + // blobs from a new slot not start halfway through next_data pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec { - let (num_data, num_coding) = self.session.dimensions(); let mut next_coding = - Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding); + Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING); - if !self.leftover.is_empty() - && !next_data.is_empty() - && self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() - { - self.leftover.clear(); + if self.leftover.len() > 0 && next_data.len() > 0 { + if self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() { + self.leftover.clear(); // reset on slot boundaries + } } - let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect(); - for data_blobs in next_data.chunks(num_data) { - if data_blobs.len() < num_data { + for data_blobs in next_data.chunks(NUM_DATA) { + if data_blobs.len() < NUM_DATA { self.leftover = data_blobs.to_vec(); break; } self.leftover.clear(); - // find max_data_size for the erasure set - let max_data_size = data_blobs - .iter() - .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)); + // find max_data_size for the chunk, round length up to a multiple of wb() + let max_data_size = align!( + data_blobs + .iter() + .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)), + wb() + ); let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); let data_ptrs: Vec<_> = data_locks @@ -219,9 +280,9 @@ impl CodingGenerator { .map(|l| &l.data[..max_data_size]) .collect(); - let mut coding_blobs = Vec::with_capacity(num_coding); + let mut coding_blobs = Vec::with_capacity(NUM_CODING); - for data_blob in &data_locks[..num_coding] { + for data_blob in &data_locks[..NUM_CODING] { let index = data_blob.index(); let slot = data_blob.slot(); let id = data_blob.id(); @@ -244,7 +305,7 @@ impl CodingGenerator { .map(|blob| &mut blob.data_mut()[..max_data_size]) .collect(); - self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice()) + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs) } .is_ok() { @@ -259,27 +320,12 @@ impl CodingGenerator { } } -impl Default for Session { - fn default() -> Session { - Session::new(NUM_DATA, NUM_CODING).unwrap() - } -} - -impl Default for CodingGenerator { - fn default() -> Self { - let session = Session::default(); - CodingGenerator { - leftover: Vec::with_capacity(session.0.data_shard_count()), - session: Arc::new(session), - } - } -} - #[cfg(test)] pub mod test { use super::*; use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::Blocktree; + use crate::entry::{make_tiny_test_entries, EntrySlice}; use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -322,63 +368,63 @@ pub mod test { #[test] fn test_coding() { - const N_DATA: usize = 4; - const N_CODING: usize = 2; - - let session = Session::new(N_DATA, N_CODING).unwrap(); - - let mut vs: Vec> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect(); + let zero_vec = vec![0; 16]; + let mut vs: Vec> = (0..4).map(|i| (i..(16 + i)).collect()).collect(); let v_orig: Vec = vs[0].clone(); - let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect(); + let m = 2; + let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect(); - let mut coding_blocks_slices: Vec<_> = - coding_blocks.iter_mut().map(Vec::as_mut_slice).collect(); - let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect(); - - session - .encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice()) - .expect("encoding must succeed"); + { + let mut coding_blocks_slices: Vec<_> = + coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect(); + let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect(); + assert!(generate_coding_blocks( + coding_blocks_slices.as_mut_slice(), + v_slices.as_slice(), + ) + .is_ok()); + } trace!("test_coding: coding blocks:"); for b in &coding_blocks { trace!("test_coding: {:?}", b); } - - let erasure: usize = 1; - let present = &mut [true; N_DATA + N_CODING]; - present[erasure] = false; - let erased = vs[erasure].clone(); - + let erasure: i32 = 1; + let erasures = vec![erasure, -1]; // clear an entry - vs[erasure as usize].copy_from_slice(&[0; 16]); + vs[erasure as usize].copy_from_slice(zero_vec.as_slice()); - let mut blocks: Vec<_> = vs - .iter_mut() - .chain(coding_blocks.iter_mut()) - .map(Vec::as_mut_slice) - .collect(); + { + let mut coding_blocks_slices: Vec<_> = + coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect(); + let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect(); - session - .decode_blocks(blocks.as_mut_slice(), present) - .expect("decoding must succeed"); + assert!(decode_blocks( + v_slices.as_mut_slice(), + coding_blocks_slices.as_mut_slice(), + erasures.as_slice(), + ) + .is_ok()); + } trace!("test_coding: vs:"); for v in &vs { trace!("test_coding: {:?}", v); } assert_eq!(v_orig, vs[0]); - assert_eq!(erased, vs[erasure]); } fn test_toss_and_recover( - session: &Session, data_blobs: &[SharedBlob], coding_blobs: &[SharedBlob], block_start_idx: usize, ) { let size = coding_blobs[0].read().unwrap().size(); + // toss one data and one coding + let erasures: Vec = vec![0, NUM_DATA as i32, -1]; + let mut blobs: Vec = Vec::with_capacity(ERASURE_SET_SIZE); blobs.push(SharedBlob::default()); // empty data, erasure at zero @@ -386,23 +432,14 @@ pub mod test { // skip first blob blobs.push(blob.clone()); } - blobs.push(SharedBlob::default()); // empty coding, erasure at zero for blob in &coding_blobs[1..NUM_CODING] { blobs.push(blob.clone()); } - // toss one data and one coding - let mut present = vec![true; blobs.len()]; - present[0] = false; - present[NUM_DATA] = false; + let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap(); - let (recovered_data, recovered_coding) = session - .reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0) - .expect("reconstruction must succeed"); - - assert_eq!(recovered_data.len(), 1); - assert_eq!(recovered_coding.len(), 1); + assert!(!corrupt); assert_eq!( blobs[1].read().unwrap().meta, @@ -413,15 +450,15 @@ pub mod test { data_blobs[block_start_idx + 1].read().unwrap().data() ); assert_eq!( - recovered_data[0].meta, + blobs[0].read().unwrap().meta, data_blobs[block_start_idx].read().unwrap().meta ); assert_eq!( - recovered_data[0].data(), + blobs[0].read().unwrap().data(), data_blobs[block_start_idx].read().unwrap().data() ); assert_eq!( - recovered_coding[0].data(), + blobs[NUM_DATA].read().unwrap().data(), coding_blobs[0].read().unwrap().data() ); } @@ -431,11 +468,11 @@ pub mod test { solana_logger::setup(); // trivial case - let mut coding_generator = CodingGenerator::default(); + let mut coding_generator = CodingGenerator::new(); let blobs = Vec::new(); for _ in 0..NUM_DATA * 2 { let coding = coding_generator.next(&blobs); - assert!(coding.is_empty()); + assert_eq!(coding.len(), 0); } // test coding by iterating one blob at a time @@ -443,7 +480,6 @@ pub mod test { for (i, blob) in data_blobs.iter().cloned().enumerate() { let coding_blobs = coding_generator.next(&[blob]); - if !coding_blobs.is_empty() { assert_eq!(i % NUM_DATA, NUM_DATA - 1); assert_eq!(coding_blobs.len(), NUM_CODING); @@ -454,12 +490,7 @@ pub mod test { ((i / NUM_DATA) * NUM_DATA + j) as u64 ); } - test_toss_and_recover( - &coding_generator.session, - &data_blobs, - &coding_blobs, - i - (i % NUM_DATA), - ); + test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA)); } } } @@ -468,7 +499,7 @@ pub mod test { fn test_erasure_generate_coding_reset_on_new_slot() { solana_logger::setup(); - let mut coding_generator = CodingGenerator::default(); + let mut coding_generator = CodingGenerator::new(); // test coding by iterating one blob at a time let data_blobs = generate_test_blobs(0, NUM_DATA * 2); @@ -478,18 +509,13 @@ pub mod test { } let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]); - assert!(coding_blobs.is_empty()); + assert_eq!(coding_blobs.len(), 0); let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]); assert_eq!(coding_blobs.len(), NUM_CODING); - test_toss_and_recover( - &coding_generator.session, - &data_blobs, - &coding_blobs, - NUM_DATA, - ); + test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA); } #[test] @@ -545,17 +571,24 @@ pub mod test { } } + /// This test is ignored because if successful, it never stops running. It is useful for + /// dicovering an initialization race-condition in the erasure FFI bindings. If this bug + /// re-emerges, running with `Z_THREADS = N` where `N > 1` should crash fairly rapidly. + #[ignore] #[test] fn test_recovery_with_model() { + use std::env; + use std::sync::{Arc, Mutex}; use std::thread; const MAX_ERASURE_SETS: u64 = 16; - const N_THREADS: usize = 2; - const N_SLOTS: u64 = 10; - solana_logger::setup(); + let n_threads: usize = env::var("Z_THREADS") + .unwrap_or("1".to_string()) + .parse() + .unwrap(); - let specs = (0..N_SLOTS).map(|slot| { + let specs = (0..).map(|slot| { let num_erasure_sets = slot % MAX_ERASURE_SETS; let set_specs = (0..num_erasure_sets) @@ -569,12 +602,12 @@ pub mod test { SlotSpec { slot, set_specs } }); + let decode_mutex = Arc::new(Mutex::new(())); let mut handles = vec![]; - let session = Arc::new(Session::default()); - for i in 0..N_THREADS { + for i in 0..n_threads { let specs = specs.clone(); - let session = Arc::clone(&session); + let decode_mutex = Arc::clone(&decode_mutex); let handle = thread::Builder::new() .name(i.to_string()) @@ -584,39 +617,55 @@ pub mod test { let erased_coding = erasure_set.coding[0].clone(); let erased_data = erasure_set.data[..3].to_vec(); - let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); + let mut data = Vec::with_capacity(NUM_DATA); + let mut coding = Vec::with_capacity(NUM_CODING); + let erasures = vec![0, 1, 2, NUM_DATA as i32, -1]; - blobs.push(SharedBlob::default()); - blobs.push(SharedBlob::default()); - blobs.push(SharedBlob::default()); + data.push(SharedBlob::default()); + data.push(SharedBlob::default()); + data.push(SharedBlob::default()); for blob in erasure_set.data.into_iter().skip(3) { - blobs.push(blob); + data.push(blob); } - blobs.push(SharedBlob::default()); + coding.push(SharedBlob::default()); for blob in erasure_set.coding.into_iter().skip(1) { - blobs.push(blob); + coding.push(blob); } - let size = erased_coding.read().unwrap().size() as usize; + let size = erased_coding.read().unwrap().data_size() as usize; - let mut present = vec![true; ERASURE_SET_SIZE]; - present[0] = false; - present[1] = false; - present[2] = false; - present[NUM_DATA] = false; + let mut data_locks: Vec<_> = + data.iter().map(|shared| shared.write().unwrap()).collect(); + let mut coding_locks: Vec<_> = coding + .iter() + .map(|shared| shared.write().unwrap()) + .collect(); - session - .reconstruct_shared_blobs( - &mut blobs, - &present, - size, - erasure_set.set_index * NUM_DATA as u64, - slot_model.slot, + let mut data_ptrs: Vec<_> = data_locks + .iter_mut() + .map(|blob| &mut blob.data[..size]) + .collect(); + let mut coding_ptrs: Vec<_> = coding_locks + .iter_mut() + .map(|blob| &mut blob.data_mut()[..size]) + .collect(); + + { + let _lock = decode_mutex.lock(); + + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, ) - .expect("reconstruction must succeed"); + .expect("decoding must succeed"); + } - for (expected, recovered) in erased_data.iter().zip(blobs.iter()) { + drop(coding_locks); + drop(data_locks); + + for (expected, recovered) in erased_data.iter().zip(data.iter()) { let expected = expected.read().unwrap(); let mut recovered = recovered.write().unwrap(); let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE; @@ -628,7 +677,7 @@ pub mod test { assert_eq!( erased_coding.read().unwrap().data(), - blobs[NUM_DATA].read().unwrap().data() + coding[0].read().unwrap().data() ); debug!("passed set: {}", erasure_set.set_index); @@ -653,9 +702,7 @@ pub mod test { IntoIt: Iterator + Clone + 'a, S: Borrow, { - let mut coding_generator = CodingGenerator::default(); - - specs.into_iter().map(move |spec| { + specs.into_iter().map(|spec| { let spec = spec.borrow(); let slot = spec.slot; @@ -666,7 +713,7 @@ pub mod test { let set_index = erasure_spec.set_index as usize; let start_index = set_index * NUM_DATA; - let mut blobs = generate_test_blobs(0, NUM_DATA); + let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs(); index_blobs( &blobs, &Keypair::new().pubkey(), @@ -675,6 +722,7 @@ pub mod test { 0, ); + let mut coding_generator = CodingGenerator::new(); let mut coding_blobs = coding_generator.next(&blobs); blobs.drain(erasure_spec.num_data..); @@ -722,60 +770,84 @@ pub mod test { blocktree } - // fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool { - // let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); - // - // blobs.iter().enumerate().all(|(i, blob)| { - // let blob = blob.read().unwrap(); - // blob.index() as usize == i + offset && blob.data() == &data[..] - // }) - // } - // fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { - let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); - - let blobs: Vec<_> = (0..num_blobs) - .into_iter() - .map(|_| { - let mut blob = Blob::default(); - blob.data_mut()[..data.len()].copy_from_slice(&data); - blob.set_size(data.len()); - Arc::new(RwLock::new(blob)) - }) - .collect(); + let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs(); index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0); - blobs } - impl Session { - fn reconstruct_shared_blobs( - &self, - blobs: &mut [SharedBlob], - present: &[bool], - size: usize, - block_start_idx: u64, - slot: u64, - ) -> Result<(Vec, Vec)> { - let mut locks: Vec> = blobs - .iter() - .map(|shared_blob| shared_blob.write().unwrap()) - .collect(); + fn decode_blobs( + blobs: &[SharedBlob], + erasures: &[i32], + size: usize, + block_start_idx: u64, + slot: u64, + ) -> Result { + let mut locks = Vec::with_capacity(ERASURE_SET_SIZE); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); + let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); - let mut slices: Vec<_> = locks - .iter_mut() - .enumerate() - .map(|(i, blob)| { - if i < NUM_DATA { - &mut blob.data[..size] - } else { - &mut blob.data_mut()[..size] - } - }) - .collect(); - - self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot) + assert_eq!(blobs.len(), ERASURE_SET_SIZE); + for b in blobs { + locks.push(b.write().unwrap()); } + + for (i, l) in locks.iter_mut().enumerate() { + if i < NUM_DATA { + data_ptrs.push(&mut l.data[..size]); + } else { + coding_ptrs.push(&mut l.data_mut()[..size]); + } + } + + // Decode the blocks + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + + // Create the missing blobs from the reconstructed data + let mut corrupt = false; + + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + let mut idx = n as u64 + block_start_idx; + + let mut data_size; + if n < NUM_DATA { + data_size = locks[n].data_size() as usize; + data_size -= BLOB_HEADER_SIZE; + if data_size > BLOB_DATA_SIZE { + error!("corrupt data blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } else { + data_size = size; + idx -= NUM_DATA as u64; + locks[n].set_slot(slot); + locks[n].set_index(idx); + + if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } + + locks[n].set_size(data_size); + trace!( + "erasures[{}] ({}) size: {} data[0]: {}", + *i, + idx, + data_size, + locks[n].data()[0] + ); + } + + Ok(corrupt) } + } diff --git a/core/src/lib.rs b/core/src/lib.rs index c6206781f..82b28dc75 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -31,6 +31,7 @@ pub mod cluster; pub mod cluster_info; pub mod cluster_tests; pub mod entry; +#[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; pub mod fullnode; diff --git a/core/src/packet.rs b/core/src/packet.rs index 5194015db..68802f427 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -376,11 +376,7 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1; impl Blob { pub fn new(data: &[u8]) -> Self { let mut blob = Self::default(); - - assert!(data.len() <= blob.data.len()); - let data_len = cmp::min(data.len(), blob.data.len()); - let bytes = &data[..data_len]; blob.data[..data_len].copy_from_slice(bytes); blob.meta.size = blob.data_size() as usize; @@ -467,8 +463,8 @@ impl Blob { LittleEndian::read_u64(&self.data[SIZE_RANGE]) } - pub fn set_data_size(&mut self, size: u64) { - LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); + pub fn set_data_size(&mut self, ix: u64) { + LittleEndian::write_u64(&mut self.data[SIZE_RANGE], ix); } pub fn data(&self) -> &[u8] { diff --git a/core/src/result.rs b/core/src/result.rs index 05eb4c84d..fba0bcefe 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -2,6 +2,8 @@ use crate::blocktree; use crate::cluster_info; +#[cfg(feature = "erasure")] +use crate::erasure; use crate::packet; use crate::poh_recorder; use bincode; @@ -23,7 +25,8 @@ pub enum Error { TransactionError(transaction::TransactionError), ClusterInfoError(cluster_info::ClusterInfoError), BlobError(packet::BlobError), - ErasureError(reed_solomon_erasure::Error), + #[cfg(feature = "erasure")] + ErasureError(erasure::ErasureError), SendError, PohRecorderError(poh_recorder::PohRecorderError), BlocktreeError(blocktree::BlocktreeError), @@ -64,8 +67,9 @@ impl std::convert::From for Error { Error::ClusterInfoError(e) } } -impl std::convert::From for Error { - fn from(e: reed_solomon_erasure::Error) -> Error { +#[cfg(feature = "erasure")] +impl std::convert::From for Error { + fn from(e: erasure::ErasureError) -> Error { Error::ErasureError(e) } }