diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 0b18c5df73..a2be77cb6f 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,6 +3,8 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; +#[cfg(feature = "erasure")] +use crate::erasure; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; #[cfg(feature = "kvstore")] @@ -15,7 +17,8 @@ use hashbrown::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; -use serde::Serialize; +#[cfg(feature = "erasure")] +use solana_metrics::counter::Counter; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; @@ -30,7 +33,10 @@ use std::rc::Rc; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, RwLock}; +pub use self::meta::*; + mod db; +mod meta; macro_rules! db_imports { { $mod:ident, $db:ident, $db_path:expr } => { @@ -67,67 +73,14 @@ pub enum BlocktreeError { KvsDb(kvstore::Error), } -#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] -// The Meta column family -pub struct SlotMeta { - // The number of slots above the root (the genesis block). The first - // slot has slot 0. - pub slot: u64, - // The total number of consecutive blobs starting from index 0 - // we have received for this slot. - pub consumed: u64, - // The index *plus one* of the highest blob received for this slot. Useful - // for checking if the slot has received any blobs yet, and to calculate the - // range where there is one or more holes: `(consumed..received)`. - pub received: u64, - // The index of the blob that is flagged as the last blob for this slot. - pub last_index: u64, - // The slot height of the block this one derives from. - pub parent_slot: u64, - // The list of slot heights, each of which contains a block that derives - // from this one. - pub next_slots: Vec, - // True if this slot is full (consumed == last_index + 1) and if every - // slot that is a parent of this slot is also connected. - pub is_connected: bool, -} - -impl SlotMeta { - pub fn is_full(&self) -> bool { - // last_index is std::u64::MAX when it has no information about how - // many blobs will fill this slot. - // Note: A full slot with zero blobs is not possible. - if self.last_index == std::u64::MAX { - return false; - } - assert!(self.consumed <= self.last_index + 1); - - self.consumed == self.last_index + 1 - } - - pub fn is_parent_set(&self) -> bool { - self.parent_slot != std::u64::MAX - } - - fn new(slot: u64, parent_slot: u64) -> Self { - SlotMeta { - slot, - consumed: 0, - received: 0, - parent_slot, - next_slots: vec![], - is_connected: slot == 0, - last_index: std::u64::MAX, - } - } -} - // ledger window pub struct Blocktree { db: Arc, meta_cf: LedgerColumn, data_cf: LedgerColumn, erasure_cf: LedgerColumn, + #[cfg(feature = "erasure")] + erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, pub new_blobs_signals: Vec>, pub root_slot: RwLock, @@ -139,6 +92,8 @@ pub const META_CF: &str = "meta"; pub const DATA_CF: &str = "data"; // Column family for erasure data pub const ERASURE_CF: &str = "erasure"; +#[cfg(feature = "erasure")] +pub const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data pub const ORPHANS_CF: &str = "orphans"; @@ -161,6 +116,8 @@ impl Blocktree { // Create the erasure column family let erasure_cf = LedgerColumn::new(&db); + #[cfg(feature = "erasure")] + let erasure_meta_cf = LedgerColumn::new(&db); // Create the orphans column family. An "orphan" is defined as // the head of a detached chain of slots, i.e. a slot with no @@ -172,6 +129,8 @@ impl Blocktree { meta_cf, data_cf, erasure_cf, + #[cfg(feature = "erasure")] + erasure_meta_cf, orphans_cf, new_blobs_signals: vec![], root_slot: RwLock::new(0), @@ -314,6 +273,8 @@ impl Blocktree { // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); + #[cfg(feature = "erasure")] + let mut erasure_meta_working_set = HashMap::new(); let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let mut prev_inserted_blob_datas = HashMap::new(); @@ -354,6 +315,21 @@ impl Blocktree { continue; } + #[cfg(feature = "erasure")] + { + let set_index = ErasureMeta::set_index_for(blob.index()); + let erasure_meta_entry = erasure_meta_working_set + .entry((blob_slot, set_index)) + .or_insert_with(|| { + self.erasure_meta_cf + .get((blob_slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index)) + }); + + erasure_meta_entry.set_data_present(blob.index()); + } + let _ = self.insert_data_blob( blob, &mut prev_inserted_blob_datas, @@ -377,13 +353,53 @@ impl Blocktree { } } + #[cfg(feature = "erasure")] + { + for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { + write_batch.put::((*slot, *set_index), erasure_meta)?; + } + } + self.db.write(write_batch)?; + if should_signal { for signal in self.new_blobs_signals.iter() { let _ = signal.try_send(true); } } + #[cfg(feature = "erasure")] + for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() { + if erasure_meta.can_recover() { + match self.recover(slot, set_index) { + Ok(recovered) => { + inc_new_counter_info!("erasures-recovered", recovered); + } + Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => { + let mut erasure_meta = self + .erasure_meta_cf + .get((slot, set_index))? + .expect("erasure meta should exist"); + + let mut batch = self.db.batch()?; + + let start_index = erasure_meta.start_index(); + let (_, coding_end_idx) = erasure_meta.end_indexes(); + + erasure_meta.coding = 0; + batch.put::((slot, set_index), &erasure_meta)?; + + for idx in start_index..coding_end_idx { + batch.delete::((slot, idx))?; + } + + self.db.write(batch)?; + } + Err(e) => return Err(e), + } + } + } + Ok(()) } @@ -457,10 +473,66 @@ impl Blocktree { pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.data_cf.get_bytes((slot, index)) } - pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + + pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { self.erasure_cf.put_bytes((slot, index), bytes) } + #[cfg(not(feature = "erasure"))] + #[inline] + pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.put_coding_blob_bytes_raw(slot, index, bytes) + } + + /// this function will insert coding blobs and also automatically track erasure-related + /// metadata. If recovery is available it will be done + #[cfg(feature = "erasure")] + pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + let set_index = ErasureMeta::set_index_for(index); + let mut erasure_meta = self + .erasure_meta_cf + .get((slot, set_index))? + .unwrap_or_else(|| ErasureMeta::new(set_index)); + + erasure_meta.set_coding_present(index); + + let mut writebatch = self.db.batch()?; + + writebatch.put_bytes::((slot, index), bytes)?; + + writebatch.put::((slot, set_index), &erasure_meta)?; + + self.db.write(writebatch)?; + + if erasure_meta.can_recover() { + match self.recover(slot, set_index) { + Ok(recovered) => { + inc_new_counter_info!("erasures-recovered", recovered); + return Ok(()); + } + Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => { + let start_index = erasure_meta.start_index(); + let (_, coding_end_idx) = erasure_meta.end_indexes(); + let mut batch = self.db.batch()?; + + erasure_meta.coding = 0; + batch.put::((slot, set_index), &erasure_meta)?; + + for idx in start_index..coding_end_idx { + batch.delete::((slot, idx as u64))?; + } + + self.db.write(batch)?; + + return Ok(()); + } + Err(e) => return Err(e), + } + } + + Ok(()) + } + pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> { self.data_cf.put_bytes((slot, index), value) } @@ -1016,6 +1088,144 @@ impl Blocktree { Ok(()) } + #[cfg(feature = "erasure")] + /// Attempts recovery using erasure coding + fn recover(&self, slot: u64, set_index: u64) -> Result { + use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA}; + use crate::packet::BLOB_DATA_SIZE; + + let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap(); + + let start_idx = erasure_meta.start_index(); + let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); + + let mut erasures = Vec::with_capacity(NUM_CODING + 1); + let (mut data, mut coding) = (vec![], vec![]); + let mut size = 0; + + for i in start_idx..coding_end_idx { + if erasure_meta.is_coding_present(i) { + let blob_bytes = self + .erasure_cf + .get_bytes((slot, i))? + .expect("erasure_meta must have no false positives"); + + if size == 0 { + size = blob_bytes.len() - BLOB_HEADER_SIZE; + } + + coding.push(blob_bytes); + } else { + let set_relative_idx = (i - start_idx) + NUM_DATA as u64; + coding.push(vec![0; crate::packet::BLOB_SIZE]); + erasures.push(set_relative_idx as i32); + } + } + + assert_ne!(size, 0); + + for i in start_idx..data_end_idx { + if erasure_meta.is_data_present(i) { + let mut blob_bytes = self + .data_cf + .get_bytes((slot, i))? + .expect("erasure_meta must have no false positives"); + + // If data is too short, extend it with zeroes + if blob_bytes.len() < size { + blob_bytes.resize(size, 0u8); + } + + data.push(blob_bytes); + } else { + let set_relative_index = i - start_idx; + data.push(vec![0; size]); + // data erasures must come before any coding erasures if present + erasures.insert(0, set_relative_index as i32); + } + } + + let mut coding_ptrs: Vec<_> = coding + .iter_mut() + .map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size]) + .collect(); + + let mut data_ptrs: Vec<_> = data + .iter_mut() + .map(|data_bytes| &mut data_bytes[..size]) + .collect(); + + // Marks the end + erasures.push(-1); + trace!("erasures: {:?}, size: {}", erasures, size); + + erasure::decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + + // Create the missing blobs from the reconstructed data + let block_start_idx = erasure_meta.start_index(); + let (mut recovered_data, mut recovered_coding) = (vec![], vec![]); + + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + + let (data_size, idx, first_byte); + + if n < NUM_DATA { + let mut blob = Blob::new(&data_ptrs[n]); + + idx = n as u64 + block_start_idx; + data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; + first_byte = blob.data[0]; + + if data_size > BLOB_DATA_SIZE { + error!("corrupt data blob[{}] data_size: {}", idx, data_size); + return Err(Error::ErasureError(ErasureError::CorruptCoding)); + } + + blob.set_slot(slot); + blob.set_index(idx); + blob.set_size(data_size); + recovered_data.push(blob); + } else { + let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]); + + idx = (n - NUM_DATA) as u64 + block_start_idx; + data_size = size; + first_byte = blob.data[0]; + + if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); + return Err(Error::ErasureError(ErasureError::CorruptCoding)); + } + + blob.set_slot(slot); + blob.set_index(idx); + blob.set_data_size(data_size as u64); + recovered_coding.push(blob); + } + + trace!( + "erasures[{}] ({}) size: {} data[0]: {}", + *i, + idx, + data_size, + first_byte, + ); + } + + self.write_blobs(recovered_data)?; + + for blob in recovered_coding { + self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?; + } + + Ok(erasures.len() - 1) + } + /// Returns the next consumed index and the number of ticks in the new consumed /// range fn get_slot_consecutive_blobs<'a>( @@ -2484,6 +2694,282 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[cfg(feature = "erasure")] + mod erasure { + use super::*; + use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; + use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; + use rand::{thread_rng, Rng}; + use std::sync::RwLock; + + impl Into for Blob { + fn into(self) -> SharedBlob { + Arc::new(RwLock::new(self)) + } + } + + #[test] + fn test_erasure_meta_accuracy() { + let path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&path).unwrap(); + + // one erasure set + half of the next + let num_blobs = 24; + let slot = 0; + + let (blobs, _) = make_slot_entries(slot, 0, num_blobs); + let shared_blobs: Vec<_> = blobs + .iter() + .cloned() + .map(|blob| Arc::new(RwLock::new(blob))) + .collect(); + + blocktree.write_blobs(&blobs[8..16]).unwrap(); + + let erasure_meta_opt = blocktree + .erasure_meta_cf + .get((slot, 0)) + .expect("DB get must succeed"); + + assert!(erasure_meta_opt.is_some()); + let erasure_meta = erasure_meta_opt.unwrap(); + + assert_eq!(erasure_meta.data, 0xFF00); + assert_eq!(erasure_meta.coding, 0x0); + + blocktree.write_blobs(&blobs[..8]).unwrap(); + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 0)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.data, 0xFFFF); + assert_eq!(erasure_meta.coding, 0x0); + + blocktree.write_blobs(&blobs[16..]).unwrap(); + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 1)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.data, 0x00FF); + assert_eq!(erasure_meta.coding, 0x0); + + let mut coding_generator = CodingGenerator::new(); + let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]).unwrap(); + + for shared_coding_blob in coding_blobs { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .unwrap(); + } + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 0)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.data, 0xFFFF); + assert_eq!(erasure_meta.coding, 0x0F); + } + + #[test] + pub fn test_recovery_basic() { + solana_logger::setup(); + + let slot = 0; + + let ledger_path = get_tmp_ledger_path!(); + + let blocktree = Blocktree::open(&ledger_path).unwrap(); + let data_blobs = make_slot_entries(slot, 0, 3 * NUM_DATA as u64) + .0 + .into_iter() + .map(Blob::into) + .collect::>(); + + let mut coding_generator = CodingGenerator::new(); + + for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { + let focused_index = (set_index + 1) * NUM_DATA - 1; + let coding_blobs = coding_generator.next(&data_blobs).unwrap(); + assert_eq!(coding_blobs.len(), NUM_CODING); + + let deleted_data = data_blobs[NUM_DATA - 1].clone(); + debug!( + "deleted: slot: {}, index: {}", + deleted_data.read().unwrap().slot(), + deleted_data.read().unwrap().index() + ); + + blocktree + .write_shared_blobs(&data_blobs[..NUM_DATA - 1]) + .unwrap(); + + // this should trigger recovery + for shared_coding_blob in coding_blobs { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + + blocktree + .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) + .expect("Inserting coding blobs must succeed"); + (slot, blob.index()); + } + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, set_index as u64)) + .expect("Erasure Meta should be present") + .unwrap(); + + assert_eq!(erasure_meta.data, 0xFFFF); + assert_eq!(erasure_meta.coding, 0x0F); + + let retrieved_data = blocktree + .data_cf + .get_bytes((slot, focused_index as u64)) + .unwrap(); + + assert!(retrieved_data.is_some()); + + let data_blob = Blob::new(&retrieved_data.unwrap()); + + assert_eq!(&data_blob, &*deleted_data.read().unwrap()); + } + + drop(blocktree); + + Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction"); + } + + /// FIXME: JERASURE Threading: see Issue + /// [#3725](https://github.com/solana-labs/solana/issues/3725) + #[test] + fn test_recovery_multi_slot_multi_thread() { + use std::thread; + + const USE_THREADS: bool = true; + let slots = vec![0, 3, 5, 50, 100]; + let max_erasure_sets = 16; + solana_logger::setup(); + + let path = get_tmp_ledger_path!(); + let mut rng = thread_rng(); + + // Specification should generate a ledger where each slot has an random number of + // erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones + // will have between 1-4 data blobs missing and all coding blobs + let specs = slots + .iter() + .map(|&slot| { + let num_erasure_sets = rng.gen_range(0, max_erasure_sets); + + let set_specs = (0..num_erasure_sets) + .map(|set_index| { + let (num_data, num_coding) = if set_index % 2 == 0 { + (NUM_DATA - rng.gen_range(1, 5), NUM_CODING) + } else { + (NUM_DATA, 0) + }; + ErasureSpec { + set_index, + num_data, + num_coding, + } + }) + .collect(); + + SlotSpec { slot, set_specs } + }) + .collect::>(); + + let model = generate_ledger_model(&specs); + let blocktree = Arc::new(Blocktree::open(&path).unwrap()); + + // Write to each slot in a different thread simultaneously. + // These writes should trigger the recovery. Every erasure set should have all of its + // data blobs + let mut handles = vec![]; + + for slot_model in model.clone() { + let blocktree = Arc::clone(&blocktree); + let slot = slot_model.slot; + let closure = move || { + for erasure_set in slot_model.chunks { + blocktree + .write_shared_blobs(erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + + for shared_coding_blob in erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } + }; + + if USE_THREADS { + handles.push(thread::spawn(closure)); + } else { + closure(); + } + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); + + for slot_model in model { + let slot = slot_model.slot; + + for erasure_set_model in slot_model.chunks { + let set_index = erasure_set_model.set_index as u64; + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, set_index)) + .expect("DB get must succeed") + .expect("ErasureMeta must be present for each erasure set"); + + debug!( + "multislot: got erasure_meta: slot: {}, set_index: {}, erasure_meta: {:?}", + slot, set_index, erasure_meta + ); + + // all possibility for recovery should be exhausted + assert!(!erasure_meta.can_recover()); + // Should have all data + assert_eq!(erasure_meta.data, 0xFFFF); + if set_index % 2 == 0 { + // Even sets have all coding + assert_eq!(erasure_meta.coding, 0x0F); + } + } + } + + drop(blocktree); + Blocktree::destroy(&path).expect("Blocktree destruction must succeed"); + } + } + pub fn entries_to_blobs( entries: &Vec, slot: u64, diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 9356e8e9a4..4bdb8ad2bc 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -27,6 +27,11 @@ pub mod columns { #[derive(Debug)] /// Data Column pub struct Data; + + #[cfg(feature = "erasure")] + #[derive(Debug)] + /// The erasure meta column + pub struct ErasureMeta; } pub trait Backend: Sized + Send + Sync { diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index ce6025105f..d5cc0784e8 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -138,6 +138,30 @@ impl TypedColumn for cf::SlotMeta { type Type = super::SlotMeta; } +#[cfg(feature = "erasure")] +impl Column for cf::ErasureMeta { + const NAME: &'static str = super::ERASURE_META_CF; + type Index = (u64, u64); + + fn key((slot, set_index): (u64, u64)) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + BigEndian::write_u64(&mut key.0[16..], set_index); + key + } + + fn index(key: &Key) -> (u64, u64) { + let slot = BigEndian::read_u64(&key.0[8..16]); + let set_index = BigEndian::read_u64(&key.0[16..]); + (slot, set_index) + } +} + +#[cfg(feature = "erasure")] +impl TypedColumn for cf::ErasureMeta { + type Type = super::ErasureMeta; +} + impl DbCursor for Dummy { fn valid(&self) -> bool { unimplemented!() diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs new file mode 100644 index 0000000000..1b217390e7 --- /dev/null +++ b/core/src/blocktree/meta.rs @@ -0,0 +1,181 @@ +#[cfg(feature = "erasure")] +use crate::erasure::{NUM_CODING, NUM_DATA}; + +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +// The Meta column family +pub struct SlotMeta { + // The number of slots above the root (the genesis block). The first + // slot has slot 0. + pub slot: u64, + // The total number of consecutive blobs starting from index 0 + // we have received for this slot. + pub consumed: u64, + // The index *plus one* of the highest blob received for this slot. Useful + // for checking if the slot has received any blobs yet, and to calculate the + // range where there is one or more holes: `(consumed..received)`. + pub received: u64, + // The index of the blob that is flagged as the last blob for this slot. + pub last_index: u64, + // The slot height of the block this one derives from. + pub parent_slot: u64, + // The list of slot heights, each of which contains a block that derives + // from this one. + pub next_slots: Vec, + // True if this slot is full (consumed == last_index + 1) and if every + // slot that is a parent of this slot is also connected. + pub is_connected: bool, +} + +impl SlotMeta { + pub fn is_full(&self) -> bool { + // last_index is std::u64::MAX when it has no information about how + // many blobs will fill this slot. + // Note: A full slot with zero blobs is not possible. + if self.last_index == std::u64::MAX { + return false; + } + assert!(self.consumed <= self.last_index + 1); + + self.consumed == self.last_index + 1 + } + + pub fn is_parent_set(&self) -> bool { + self.parent_slot != std::u64::MAX + } + + pub(in crate::blocktree) fn new(slot: u64, parent_slot: u64) -> Self { + SlotMeta { + slot, + consumed: 0, + received: 0, + parent_slot, + next_slots: vec![], + is_connected: slot == 0, + last_index: std::u64::MAX, + } + } +} + +#[cfg(feature = "erasure")] +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +/// Erasure coding information +pub struct ErasureMeta { + /// Which erasure set in the slot this is + pub set_index: u64, + /// Bitfield representing presence/absence of data blobs + pub data: u64, + /// Bitfield representing presence/absence of coding blobs + pub coding: u64, +} + +#[cfg(feature = "erasure")] +impl ErasureMeta { + pub fn new(set_index: u64) -> ErasureMeta { + ErasureMeta { + set_index, + data: 0, + coding: 0, + } + } + + pub fn can_recover(&self) -> bool { + let (data_missing, coding_missing) = ( + NUM_DATA - self.data.count_ones() as usize, + NUM_CODING - self.coding.count_ones() as usize, + ); + + data_missing > 0 && data_missing + coding_missing <= NUM_CODING + } + + pub fn is_coding_present(&self, index: u64) -> bool { + let set_index = Self::set_index_for(index); + let position = index - self.start_index(); + + set_index == self.set_index && self.coding & (1 << position) != 0 + } + + pub fn set_coding_present(&mut self, index: u64) { + let set_index = Self::set_index_for(index); + + if set_index as u64 == self.set_index { + let position = index - self.start_index(); + + self.coding |= 1 << position; + } + } + + pub fn is_data_present(&self, index: u64) -> bool { + let set_index = Self::set_index_for(index); + let position = index - self.start_index(); + + set_index == self.set_index && self.data & (1 << position) != 0 + } + + pub fn set_data_present(&mut self, index: u64) { + let set_index = Self::set_index_for(index); + + if set_index as u64 == self.set_index { + let position = index - self.start_index(); + + self.data |= 1 << position; + } + } + + pub fn set_index_for(index: u64) -> u64 { + index / NUM_DATA as u64 + } + + pub fn start_index(&self) -> u64 { + self.set_index * NUM_DATA as u64 + } + + /// returns a tuple of (data_end, coding_end) + pub fn end_indexes(&self) -> (u64, u64) { + let start = self.start_index(); + (start + NUM_DATA as u64, start + NUM_CODING as u64) + } +} + +#[cfg(feature = "erasure")] +#[test] +fn test_can_recover() { + let set_index = 0; + let mut e_meta = ErasureMeta { + set_index, + data: 0, + coding: 0, + }; + + assert!(!e_meta.can_recover()); + + e_meta.data = 0b1111_1111_1111_1111; + e_meta.coding = 0x00; + + assert!(!e_meta.can_recover()); + + e_meta.coding = 0x0e; + assert_eq!(0x0fu8, 0b0000_1111u8); + assert!(!e_meta.can_recover()); + + e_meta.data = 0b0111_1111_1111_1111; + assert!(e_meta.can_recover()); + + e_meta.data = 0b0111_1111_1111_1110; + assert!(e_meta.can_recover()); + + e_meta.data = 0b0111_1111_1011_1110; + assert!(e_meta.can_recover()); + + e_meta.data = 0b0111_1011_1011_1110; + assert!(!e_meta.can_recover()); + + e_meta.data = 0b0111_1011_1011_1110; + assert!(!e_meta.can_recover()); + + e_meta.coding = 0b0000_1110; + e_meta.data = 0b1111_1111_1111_1100; + assert!(e_meta.can_recover()); + + e_meta.data = 0b1111_1111_1111_1000; + assert!(e_meta.can_recover()); +} diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 08e8b89f15..0f3bdaf526 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,6 +30,8 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { + #[cfg(feature = "erasure")] + use crate::blocktree::db::columns::ErasureMeta; use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta}; fs::create_dir_all(&path)?; @@ -41,12 +43,17 @@ impl Backend for Rocks { let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); + #[cfg(feature = "erasure")] + let erasure_meta_cf_descriptor = + ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); let cfs = vec![ meta_cf_descriptor, data_cf_descriptor, erasure_cf_descriptor, + #[cfg(feature = "erasure")] + erasure_meta_cf_descriptor, orphans_cf_descriptor, ]; @@ -57,9 +64,18 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { + #[cfg(feature = "erasure")] + use crate::blocktree::db::columns::ErasureMeta; use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta}; - vec![Coding::NAME, Data::NAME, Orphans::NAME, SlotMeta::NAME] + vec![ + Coding::NAME, + #[cfg(feature = "erasure")] + ErasureMeta::NAME, + Data::NAME, + Orphans::NAME, + SlotMeta::NAME, + ] } fn destroy(path: &Path) -> Result<()> { @@ -180,6 +196,31 @@ impl TypedColumn for cf::SlotMeta { type Type = super::SlotMeta; } +#[cfg(feature = "erasure")] +impl Column for cf::ErasureMeta { + const NAME: &'static str = super::ERASURE_META_CF; + type Index = (u64, u64); + + fn index(key: &[u8]) -> (u64, u64) { + let slot = BigEndian::read_u64(&key[..8]); + let set_index = BigEndian::read_u64(&key[8..]); + + (slot, set_index) + } + + fn key((slot, set_index): (u64, u64)) -> Vec { + let mut key = vec![0; 16]; + BigEndian::write_u64(&mut key[..8], slot); + BigEndian::write_u64(&mut key[8..], set_index); + key + } +} + +#[cfg(feature = "erasure")] +impl TypedColumn for cf::ErasureMeta { + type Type = super::ErasureMeta; +} + impl DbCursor for DBRawIterator { fn valid(&self) -> bool { DBRawIterator::valid(self) diff --git a/core/src/erasure.rs b/core/src/erasure.rs index 1b37038a80..7838abb51d 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -1,6 +1,5 @@ // Support erasure coding -use crate::blocktree::Blocktree; -use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; +use crate::packet::{Blob, SharedBlob}; use crate::result::{Error, Result}; use std::cmp; use std::sync::{Arc, RwLock}; @@ -25,6 +24,7 @@ pub enum ErasureError { EncodeError, InvalidBlockSize, InvalidBlobData, + CorruptCoding, } // k = number of data devices @@ -53,6 +53,21 @@ extern "C" { size: i32, ) -> i32; fn galois_single_divide(a: i32, b: i32, w: i32) -> i32; + fn galois_init_default_field(w: i32) -> i32; +} + +use std::sync::Once; +static ERASURE_W_ONCE: Once = Once::new(); + +fn w() -> i32 { + let w = 32; + unsafe { + ERASURE_W_ONCE.call_once(|| { + galois_init_default_field(w); + () + }); + } + w } fn get_matrix(m: i32, k: i32, w: i32) -> Vec { @@ -67,8 +82,6 @@ fn get_matrix(m: i32, k: i32, w: i32) -> Vec { matrix } -const ERASURE_W: i32 = 32; - // Generate coding blocks into coding // There are some alignment restrictions, blocks should be aligned by 16 bytes // which means their size should be >= 16 bytes @@ -79,7 +92,7 @@ fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<() let k = data.len() as i32; let m = coding.len() as i32; let block_len = data[0].len() as i32; - let matrix: Vec = get_matrix(m, k, ERASURE_W); + let matrix: Vec = get_matrix(m, k, w()); let mut data_arg = Vec::with_capacity(data.len()); for block in data { if block_len != block.len() as i32 { @@ -109,7 +122,7 @@ fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<() jerasure_matrix_encode( k, m, - ERASURE_W, + w(), matrix.as_ptr(), data_arg.as_ptr(), coding_arg.as_ptr(), @@ -123,12 +136,16 @@ fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<() // data: array of blocks to recover into // coding: arry of coding blocks // erasures: list of indices in data where blocks should be recovered -fn decode_blocks(data: &mut [&mut [u8]], coding: &mut [&mut [u8]], erasures: &[i32]) -> Result<()> { +pub fn decode_blocks( + data: &mut [&mut [u8]], + coding: &mut [&mut [u8]], + erasures: &[i32], +) -> Result<()> { if data.is_empty() { return Ok(()); } let block_len = data[0].len(); - let matrix: Vec = get_matrix(coding.len() as i32, data.len() as i32, ERASURE_W); + let matrix: Vec = get_matrix(coding.len() as i32, data.len() as i32, w()); // generate coding pointers, blocks should be the same size let mut coding_arg: Vec<*mut u8> = Vec::new(); @@ -151,7 +168,7 @@ fn decode_blocks(data: &mut [&mut [u8]], coding: &mut [&mut [u8]], erasures: &[i jerasure_matrix_decode( data.len() as i32, coding.len() as i32, - ERASURE_W, + w(), matrix.as_ptr(), 0, erasures.as_ptr(), @@ -171,90 +188,17 @@ fn decode_blocks(data: &mut [&mut [u8]], coding: &mut [&mut [u8]], erasures: &[i Ok(()) } -fn decode_blobs( - blobs: &[SharedBlob], - erasures: &[i32], - size: usize, - block_start_idx: u64, - slot: u64, -) -> Result { - let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); - - assert!(blobs.len() == NUM_DATA + NUM_CODING); - for b in blobs { - locks.push(b.write().unwrap()); - } - - for (i, l) in locks.iter_mut().enumerate() { - if i < NUM_DATA { - data_ptrs.push(&mut l.data[..size]); - } else { - coding_ptrs.push(&mut l.data_mut()[..size]); - } - } - - // Decode the blocks - decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, - )?; - - // Create the missing blobs from the reconstructed data - let mut corrupt = false; - - for i in &erasures[..erasures.len() - 1] { - let n = *i as usize; - let mut idx = n as u64 + block_start_idx; - - let mut data_size; - if n < NUM_DATA { - data_size = locks[n].data_size() as usize; - data_size -= BLOB_HEADER_SIZE; - if data_size > BLOB_DATA_SIZE { - error!("corrupt data blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } else { - data_size = size; - idx -= NUM_CODING as u64; - locks[n].set_slot(slot); - locks[n].set_index(idx); - - if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { - error!("corrupt coding blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } - - locks[n].set_size(data_size); - trace!( - "erasures[{}] ({}) size: {} data[0]: {}", - *i, - idx, - data_size, - locks[n].data()[0] - ); - } - - Ok(corrupt) -} - // Generate coding blocks in window starting from start_idx, // for num_blobs.. For each block place the coding blobs -// at the end of the block like so: +// at the start of the block like so: // -// block-size part of a Window, with each element a WindowSlot.. +// model of an erasure set, with top row being data blobs and second being coding // |<======================= NUM_DATA ==============================>| -// |<==== NUM_CODING ===>| +// |<==== NUM_CODING ===>| // +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ // | D | | D | | D | | D | | D | | D | | D | | D | | D | | D | // +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+ -// | | | | | | | | | | | | | C | | C | | C | | C | +// | C | | C | | C | | C | | | | | | | | | | | | | // +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ // // blob structure for coding, recover @@ -285,12 +229,18 @@ pub struct CodingGenerator { leftover: Vec, // SharedBlobs that couldn't be used in last call to next() } -impl CodingGenerator { - pub fn new() -> Self { - Self { +impl Default for CodingGenerator { + fn default() -> Self { + CodingGenerator { leftover: Vec::with_capacity(NUM_DATA), } } +} + +impl CodingGenerator { + pub fn new() -> Self { + Self::default() + } // must be called with consecutive data blobs from previous invocation pub fn next(&mut self, next_data: &[SharedBlob]) -> Result> { @@ -327,23 +277,21 @@ impl CodingGenerator { let mut coding_blobs = Vec::with_capacity(NUM_CODING); - for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { + for data_blob in &data_locks[..NUM_CODING] { let index = data_blob.index(); let slot = data_blob.slot(); let id = data_blob.id(); let should_forward = data_blob.should_forward(); - let coding_blob = SharedBlob::default(); - { - let mut coding_blob = coding_blob.write().unwrap(); - coding_blob.set_index(index); - coding_blob.set_slot(slot); - coding_blob.set_id(&id); - coding_blob.forward(should_forward); - coding_blob.set_size(max_data_size); - coding_blob.set_coding(); - } - coding_blobs.push(coding_blob); + let mut coding_blob = Blob::default(); + coding_blob.set_index(index); + coding_blob.set_slot(slot); + coding_blob.set_id(&id); + coding_blob.forward(should_forward); + coding_blob.set_size(max_data_size); + coding_blob.set_coding(); + + coding_blobs.push(Arc::new(RwLock::new(coding_blob))); } { @@ -364,158 +312,23 @@ impl CodingGenerator { } } -// Recover the missing data and coding blobs from the input ledger. Returns a vector -// of the recovered missing data blobs and a vector of the recovered coding blobs -pub fn recover( - blocktree: &Blocktree, - slot: u64, - start_idx: u64, -) -> Result<(Vec, Vec)> { - let block_start_idx = start_idx - (start_idx % NUM_DATA as u64); - - debug!("block_start_idx: {}", block_start_idx); - - let coding_start_idx = block_start_idx + NUM_DATA as u64 - NUM_CODING as u64; - let block_end_idx = block_start_idx + NUM_DATA as u64; - trace!( - "recover: coding_start_idx: {} block_end_idx: {}", - coding_start_idx, - block_end_idx - ); - - let data_missing = blocktree - .find_missing_data_indexes(slot, block_start_idx, block_end_idx, NUM_DATA) - .len(); - let coding_missing = blocktree - .find_missing_coding_indexes(slot, coding_start_idx, block_end_idx, NUM_CODING) - .len(); - - // if we're not missing data, or if we have too much missing but have enough coding - if data_missing == 0 { - // nothing to do... - return Ok((vec![], vec![])); - } - - if (data_missing + coding_missing) > NUM_CODING { - trace!( - "recover: start: {} skipping recovery data: {} coding: {}", - block_start_idx, - data_missing, - coding_missing - ); - // nothing to do... - return Err(Error::ErasureError(ErasureError::NotEnoughBlocksToDecode)); - } - - trace!( - "recover: recovering: data: {} coding: {}", - data_missing, - coding_missing - ); - - let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); - let mut erasures: Vec = Vec::with_capacity(NUM_CODING); - - let mut missing_data: Vec = vec![]; - let mut missing_coding: Vec = vec![]; - - // Add the data blobs we have into the recovery vector, mark the missing ones - for i in block_start_idx..block_end_idx { - let result = blocktree.get_data_blob_bytes(slot, i)?; - - categorize_blob( - &result, - &mut blobs, - &mut missing_data, - &mut erasures, - (i - block_start_idx) as i32, - )?; - } - - let mut size = None; - // Add the coding blobs we have into the recovery vector, mark the missing ones - for i in coding_start_idx..block_end_idx { - let result = blocktree.get_coding_blob_bytes(slot, i)?; - - categorize_blob( - &result, - &mut blobs, - &mut missing_coding, - &mut erasures, - ((i - coding_start_idx) + NUM_DATA as u64) as i32, - )?; - - if let Some(b) = result { - if size.is_none() { - size = Some(b.len() - BLOB_HEADER_SIZE); - } - } - } - // Due to checks above verifying that (data_missing + coding_missing) <= NUM_CODING and - // data_missing > 0, we know at least one coding block must exist, so "size" can - // not remain None after the above processing. - let size = size.unwrap(); - - // marks end of erasures - erasures.push(-1); - - trace!("erasures[]:{:?} data_size: {}", erasures, size,); - - let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx, slot)?; - - if corrupt { - // Remove the corrupted coding blobs so there's no effort wasted in trying to - // reconstruct the blobs again - for i in coding_start_idx..block_end_idx { - blocktree.delete_coding_blob(slot, i)?; - } - return Ok((vec![], vec![])); - } - - Ok((missing_data, missing_coding)) -} - -fn categorize_blob( - get_blob_result: &Option>, - blobs: &mut Vec, - missing: &mut Vec, - erasures: &mut Vec, - erasure_index: i32, -) -> Result<()> { - match get_blob_result { - Some(b) => { - if b.len() <= BLOB_HEADER_SIZE || b.len() > BLOB_SIZE { - return Err(Error::ErasureError(ErasureError::InvalidBlobData)); - } - blobs.push(Arc::new(RwLock::new(Blob::new(&b)))); - } - None => { - // Mark the missing memory - erasures.push(erasure_index); - let b = SharedBlob::default(); - blobs.push(b.clone()); - missing.push(b); - } - } - - Ok(()) -} - #[cfg(test)] pub mod test { use super::*; use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::Blocktree; use crate::entry::{make_tiny_test_entries, EntrySlice}; - use crate::packet::{index_blobs, SharedBlob}; + use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::borrow::Borrow; /// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set /// Exists to be passed to `generate_blocktree_with_coding` #[derive(Debug, Copy, Clone)] pub struct ErasureSpec { /// Which 16-blob erasure set this represents - pub set_index: usize, + pub set_index: u64, pub num_data: usize, pub num_coding: usize, } @@ -528,6 +341,23 @@ pub mod test { pub set_specs: Vec, } + /// Model of a slot in 16-blob chunks with varying amounts of erasure and coding blobs + /// present + #[derive(Debug, Clone)] + pub struct SlotModel { + pub slot: u64, + pub chunks: Vec, + } + + /// Model of 16-blob chunk + #[derive(Debug, Clone)] + pub struct ErasureSetModel { + pub set_index: u64, + pub start_index: u64, + pub coding: Vec, + pub data: Vec, + } + #[test] fn test_coding() { let zero_vec = vec![0; 16]; @@ -548,9 +378,9 @@ pub mod test { ) .is_ok()); } - trace!("coding blocks:"); + trace!("test_coding: coding blocks:"); for b in &coding_blocks { - trace!("{:?}", b); + trace!("test_coding: {:?}", b); } let erasure: i32 = 1; let erasures = vec![erasure, -1]; @@ -570,9 +400,9 @@ pub mod test { .is_ok()); } - trace!("vs:"); + trace!("test_coding: vs:"); for v in &vs { - trace!("{:?}", v); + trace!("test_coding: {:?}", v); } assert_eq!(v_orig, vs[0]); } @@ -605,14 +435,14 @@ pub mod test { let erasures: Vec = vec![0, NUM_DATA as i32, -1]; let block_start_idx = i - (i % NUM_DATA); - let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); + let mut blobs: Vec = Vec::with_capacity(ERASURE_SET_SIZE); blobs.push(SharedBlob::default()); // empty data, erasure at zero for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] { // skip first blob blobs.push(blob.clone()); } - blobs.push(SharedBlob::default()); // empty coding, erasure at NUM_DATA + blobs.push(SharedBlob::default()); // empty coding, erasure at zero for blob in &coding[1..NUM_CODING] { blobs.push(blob.clone()); } @@ -694,21 +524,19 @@ pub mod test { let slot = spec.slot; for erasure_spec in spec.set_specs.iter() { - let set_index = erasure_spec.set_index as u64; - let start_index = set_index * NUM_DATA as u64; + let start_index = erasure_spec.set_index * NUM_DATA as u64; + let (data_end, coding_end) = ( + start_index + erasure_spec.num_data as u64, + start_index + erasure_spec.num_coding as u64, + ); - for i in 0..erasure_spec.num_data as u64 { - let opt_bytes = blocktree - .get_data_blob_bytes(slot, start_index + i) - .unwrap(); + for idx in start_index..data_end { + let opt_bytes = blocktree.get_data_blob_bytes(slot, idx).unwrap(); assert!(opt_bytes.is_some()); } - for i in 0..erasure_spec.num_coding as u64 { - let coding_start_index = start_index as usize + (NUM_DATA - NUM_CODING); - let opt_bytes = blocktree - .get_coding_blob_bytes(slot, coding_start_index as u64 + i) - .unwrap(); + for idx in start_index..coding_end { + let opt_bytes = blocktree.get_coding_blob_bytes(slot, idx).unwrap(); assert!(opt_bytes.is_some()); } } @@ -719,123 +547,197 @@ pub mod test { } } + /// This test is ignored because if successful, it never stops running. It is useful for + /// dicovering an initialization race-condition in the erasure FFI bindings. If this bug + /// re-emerges, running with `Z_THREADS = N` where `N > 1` should crash fairly rapidly. + #[ignore] #[test] - fn test_blocktree_recover_basic() { - let ledger_path = get_tmp_ledger_path!(); + fn test_recovery_with_model() { + use std::env; + use std::sync::{Arc, Mutex}; + use std::thread; - // Missing 1 data blob - let spec = SlotSpec { - slot: 0, - set_specs: vec![ErasureSpec { - set_index: 0, - num_data: NUM_DATA - 1, - num_coding: 4, - }], - }; + const MAX_ERASURE_SETS: u64 = 16; + solana_logger::setup(); + let n_threads: usize = env::var("Z_THREADS") + .unwrap_or("1".to_string()) + .parse() + .unwrap(); - let blocktree = generate_blocktree_with_coding(&ledger_path, &[spec]); + let specs = (0..).map(|slot| { + let num_erasure_sets = slot % MAX_ERASURE_SETS; - let (recovered_data, recovered_coding) = - recover(&blocktree, 0, 0).expect("Expect successful recovery"); - - assert!(recovered_coding.is_empty()); - - assert!(recovered_data.len() == 1); - - drop(blocktree); - Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction"); - } - - #[test] - fn test_blocktree_recover_basic2() { - let ledger_path = get_tmp_ledger_path!(); - - // Missing 1 data blob in [0, 16) - // [16..32) complete - let spec1 = SlotSpec { - slot: 0, - set_specs: vec![ - ErasureSpec { - set_index: 0, - num_data: NUM_DATA - 1, - num_coding: NUM_CODING, - }, - ErasureSpec { - set_index: 1, + let set_specs = (0..num_erasure_sets) + .map(|set_index| ErasureSpec { + set_index, num_data: NUM_DATA, num_coding: NUM_CODING, - }, - ], - }; + }) + .collect(); - // Missing 1 coding and 1 data blbo - let spec2 = SlotSpec { - slot: 3, - set_specs: vec![ErasureSpec { - set_index: 3, - num_data: NUM_DATA - 1, - num_coding: NUM_CODING - 1, - }], - }; + SlotSpec { slot, set_specs } + }); - let blocktree = generate_blocktree_with_coding(&ledger_path, &[spec1, spec2]); + let decode_mutex = Arc::new(Mutex::new(())); + let mut handles = vec![]; - let (recovered_data, recovered_coding) = - recover(&blocktree, 0, 0).expect("Expect successful recovery"); + for i in 0..n_threads { + let specs = specs.clone(); + let decode_mutex = Arc::clone(&decode_mutex); - assert!(recovered_coding.is_empty()); - assert_eq!(recovered_data.len(), 1); + let handle = thread::Builder::new() + .name(i.to_string()) + .spawn(move || { + for slot_model in generate_ledger_model(specs) { + for erasure_set in slot_model.chunks { + let erased_coding = erasure_set.coding[0].clone(); + let erased_data = erasure_set.data[..3].to_vec(); - let (recovered_data, recovered_coding) = - recover(&blocktree, 0, NUM_DATA as u64).expect("Expect successful recovery"); + let mut data = Vec::with_capacity(NUM_DATA); + let mut coding = Vec::with_capacity(NUM_CODING); + let erasures = vec![0, 1, 2, NUM_DATA as i32, -1]; - assert!(recovered_coding.is_empty()); - assert!(recovered_data.is_empty()); + data.push(SharedBlob::default()); + data.push(SharedBlob::default()); + data.push(SharedBlob::default()); + for blob in erasure_set.data.into_iter().skip(3) { + data.push(blob); + } - let (recovered_data, recovered_coding) = - recover(&blocktree, 3, 3 * NUM_DATA as u64).expect("Expect successful recovery"); + coding.push(SharedBlob::default()); + for blob in erasure_set.coding.into_iter().skip(1) { + coding.push(blob); + } - assert_eq!(recovered_coding.len(), 1); - assert_eq!(recovered_data.len(), 1); + let size = erased_coding.read().unwrap().data_size() as usize; - drop(blocktree); - Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction"); + let mut data_locks: Vec<_> = + data.iter().map(|shared| shared.write().unwrap()).collect(); + let mut coding_locks: Vec<_> = coding + .iter() + .map(|shared| shared.write().unwrap()) + .collect(); + + let mut data_ptrs: Vec<_> = data_locks + .iter_mut() + .map(|blob| &mut blob.data[..size]) + .collect(); + let mut coding_ptrs: Vec<_> = coding_locks + .iter_mut() + .map(|blob| &mut blob.data_mut()[..size]) + .collect(); + + { + let _lock = decode_mutex.lock(); + + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + ) + .expect("decoding must succeed"); + } + + drop(coding_locks); + drop(data_locks); + + for (expected, recovered) in erased_data.iter().zip(data.iter()) { + let expected = expected.read().unwrap(); + let mut recovered = recovered.write().unwrap(); + let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE; + recovered.set_size(data_size); + let corrupt = data_size > BLOB_DATA_SIZE; + assert!(!corrupt, "CORRUPTION"); + assert_eq!(&*expected, &*recovered); + } + + assert_eq!( + erased_coding.read().unwrap().data(), + coding[0].read().unwrap().data() + ); + + debug!("passed set: {}", erasure_set.set_index); + } + debug!("passed slot: {}", slot_model.slot); + } + }) + .expect("thread build error"); + + handles.push(handle); + } + + handles.into_iter().for_each(|h| h.join().unwrap()); } - /// Genarates a ledger according to the given specs. Does not generate a valid ledger with - /// chaining and etc. + /// Generates a model of a ledger containing certain data and coding blobs according to a spec + pub fn generate_ledger_model<'a, I, IntoIt, S>( + specs: I, + ) -> impl Iterator + Clone + 'a + where + I: IntoIterator, + IntoIt: Iterator + Clone + 'a, + S: Borrow, + { + specs.into_iter().map(|spec| { + let spec = spec.borrow(); + let slot = spec.slot; + + let chunks = spec + .set_specs + .iter() + .map(|erasure_spec| { + let set_index = erasure_spec.set_index as usize; + let start_index = set_index * NUM_DATA; + + let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs(); + index_blobs( + &blobs, + &Keypair::new().pubkey(), + start_index as u64, + slot, + 0, + ); + + let mut coding_generator = CodingGenerator::new(); + let mut coding_blobs = coding_generator.next(&blobs).unwrap(); + + blobs.drain(erasure_spec.num_data..); + coding_blobs.drain(erasure_spec.num_coding..); + + ErasureSetModel { + start_index: start_index as u64, + set_index: set_index as u64, + data: blobs, + coding: coding_blobs, + } + }) + .collect(); + + SlotModel { slot, chunks } + }) + } + + /// Genarates a ledger according to the given specs. + /// Blocktree should have correct SlotMeta and ErasureMeta and so on but will not have done any + /// possible recovery. pub fn generate_blocktree_with_coding(ledger_path: &str, specs: &[SlotSpec]) -> Blocktree { let blocktree = Blocktree::open(ledger_path).unwrap(); - for spec in specs { - let slot = spec.slot; + let model = generate_ledger_model(specs); + for slot_model in model { + let slot = slot_model.slot; - for erasure_spec in spec.set_specs.iter() { - let set_index = erasure_spec.set_index as usize; - let start_index = set_index * NUM_DATA; + for erasure_set in slot_model.chunks { + blocktree.write_shared_blobs(erasure_set.data).unwrap(); - let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs(); - index_blobs(&blobs, &Pubkey::new_rand(), start_index as u64, slot, 0); - - let mut coding_generator = CodingGenerator::new(); - let mut coding_blobs = coding_generator.next(&blobs).unwrap(); - - blobs.drain(erasure_spec.num_data..); - coding_blobs.drain(erasure_spec.num_coding..); - - for shared_blob in blobs { - let blob = shared_blob.read().unwrap(); - let size = blob.size() as usize + BLOB_HEADER_SIZE; + for shared_coding_blob in erasure_set.coding.into_iter() { + let blob = shared_coding_blob.read().unwrap(); blocktree - .put_data_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) - .unwrap(); - } - - for shared_blob in coding_blobs { - let blob = shared_blob.read().unwrap(); - let size = blob.size() as usize + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .put_coding_blob_bytes_raw( + slot, + blob.index(), + &blob.data[..blob.size() + BLOB_HEADER_SIZE], + ) .unwrap(); } } @@ -851,4 +753,77 @@ pub mod test { blobs } + fn decode_blobs( + blobs: &[SharedBlob], + erasures: &[i32], + size: usize, + block_start_idx: u64, + slot: u64, + ) -> Result { + let mut locks = Vec::with_capacity(ERASURE_SET_SIZE); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); + let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + + assert_eq!(blobs.len(), ERASURE_SET_SIZE); + for b in blobs { + locks.push(b.write().unwrap()); + } + + for (i, l) in locks.iter_mut().enumerate() { + if i < NUM_DATA { + data_ptrs.push(&mut l.data[..size]); + } else { + coding_ptrs.push(&mut l.data_mut()[..size]); + } + } + + // Decode the blocks + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + + // Create the missing blobs from the reconstructed data + let mut corrupt = false; + + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + let mut idx = n as u64 + block_start_idx; + + let mut data_size; + if n < NUM_DATA { + data_size = locks[n].data_size() as usize; + data_size -= BLOB_HEADER_SIZE; + if data_size > BLOB_DATA_SIZE { + error!("corrupt data blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } else { + data_size = size; + idx -= NUM_DATA as u64; + locks[n].set_slot(slot); + locks[n].set_index(idx); + + if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } + + locks[n].set_size(data_size); + trace!( + "erasures[{}] ({}) size: {} data[0]: {}", + *i, + idx, + data_size, + locks[n].data()[0] + ); + } + + Ok(corrupt) + } + }