From 9a40ad76bd86c351de9022f6b082061b1047f1b3 Mon Sep 17 00:00:00 2001 From: "Mark E. Sinclair" <48664490+mark-solana@users.noreply.github.com> Date: Wed, 24 Apr 2019 17:53:01 -0500 Subject: [PATCH] Fix race in erasure metadata tracking (#3962) * Fix erasure metadata race condition * make erasure return the underlying error without wrapping it in the `solana::Error` type * Add metric for erasure failures * add tests to `ErasureMeta` indexing logic * Add test to ensure erasure recovery failures don't cause panics --- core/src/blocktree.rs | 579 +++++++++++++++++++++++++++---------- core/src/blocktree/meta.rs | 74 +++-- core/src/erasure.rs | 3 +- 3 files changed, 468 insertions(+), 188 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 0f2235347a..60a3d8b605 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -256,52 +256,21 @@ impl Blocktree { I::Item: Borrow, { let mut write_batch = self.db.batch()?; + let new_blobs: Vec<_> = new_blobs.into_iter().collect(); + let mut recovered_data = vec![]; + + let mut prev_inserted_blob_datas = HashMap::new(); // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); let mut erasure_meta_working_set = HashMap::new(); - let new_blobs: Vec<_> = new_blobs.into_iter().collect(); - let mut prev_inserted_blob_datas = HashMap::new(); for blob in new_blobs.iter() { let blob = blob.borrow(); let blob_slot = blob.slot(); - let parent_slot = blob.parent(); - - // Check if we've already inserted the slot metadata for this blob's slot - let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| { - // Store a 2-tuple of the metadata (working copy, backup copy) - if let Some(mut meta) = self - .meta(blob_slot) - .expect("Expect database get to succeed") - { - let backup = Some(meta.clone()); - // If parent_slot == std::u64::MAX, then this is one of the orphans inserted - // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details. Slots that are orphans are missing a parent_slot, so we should - // fill in the parent now that we know it. - if Self::is_orphan(&meta) { - meta.parent_slot = parent_slot; - } - - (Rc::new(RefCell::new(meta)), backup) - } else { - ( - Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))), - None, - ) - } - }); - - let slot_meta = &mut entry.0.borrow_mut(); - - // This slot is full, skip the bogus blob - if slot_meta.is_full() { - continue; - } let set_index = ErasureMeta::set_index_for(blob.index()); - let erasure_meta_entry = erasure_meta_working_set + erasure_meta_working_set .entry((blob_slot, set_index)) .or_insert_with(|| { self.erasure_meta_cf @@ -309,17 +278,43 @@ impl Blocktree { .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::new(set_index)) }); - - erasure_meta_entry.set_data_present(blob.index(), true); - - let _ = self.insert_data_blob( - blob, - &mut prev_inserted_blob_datas, - slot_meta, - &mut write_batch, - ); } + self.insert_data_blob_batch( + new_blobs.iter().map(Borrow::borrow), + &mut slot_meta_working_set, + &mut erasure_meta_working_set, + &mut prev_inserted_blob_datas, + &mut write_batch, + )?; + + for (&(slot, _), erasure_meta) in erasure_meta_working_set.iter_mut() { + if let Some((data, coding)) = + self.try_erasure_recover(&erasure_meta, slot, &prev_inserted_blob_datas, None)? + { + for data_blob in data { + recovered_data.push(data_blob); + } + + for coding_blob in coding { + erasure_meta.set_coding_present(coding_blob.index(), true); + + write_batch.put_bytes::( + (coding_blob.slot(), coding_blob.index()), + &coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()], + )?; + } + } + } + + self.insert_data_blob_batch( + recovered_data.iter(), + &mut slot_meta_working_set, + &mut erasure_meta_working_set, + &mut prev_inserted_blob_datas, + &mut write_batch, + )?; + // Handle chaining for the working set self.handle_chaining(&mut write_batch, &slot_meta_working_set)?; let mut should_signal = false; @@ -335,21 +330,17 @@ impl Blocktree { } } - for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() { - write_batch.put::((*slot, *set_index), erasure_meta)?; + for ((slot, set_index), erasure_meta) in erasure_meta_working_set { + write_batch.put::((slot, set_index), &erasure_meta)?; } - self.db.write(write_batch)?; - if should_signal { for signal in self.new_blobs_signals.iter() { let _ = signal.try_send(true); } } - for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() { - self.try_erasure_recover(&erasure_meta, slot, set_index)?; - } + self.db.write(write_batch)?; Ok(()) } @@ -448,6 +439,8 @@ impl Blocktree { self.data_cf.put_bytes((slot, index), bytes) } + /// For benchmarks, testing, and setup. + /// Does no metadata tracking. Use with care. pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { self.erasure_cf.put_bytes((slot, index), bytes) } @@ -468,29 +461,125 @@ impl Blocktree { writebatch.put_bytes::((slot, index), bytes)?; + if let Some((data, coding)) = + self.try_erasure_recover(&erasure_meta, slot, &HashMap::new(), Some((index, bytes)))? + { + let mut erasure_meta_working_set = HashMap::new(); + erasure_meta_working_set.insert((slot, set_index), erasure_meta); + + self.insert_data_blob_batch( + &data[..], + &mut HashMap::new(), + &mut erasure_meta_working_set, + &mut HashMap::new(), + &mut writebatch, + )?; + + erasure_meta = *erasure_meta_working_set.values().next().unwrap(); + + for coding_blob in coding { + erasure_meta.set_coding_present(coding_blob.index(), true); + + writebatch.put_bytes::( + (coding_blob.slot(), coding_blob.index()), + &coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()], + )?; + } + } + writebatch.put::((slot, set_index), &erasure_meta)?; self.db.write(writebatch)?; - self.try_erasure_recover(&erasure_meta, slot, set_index) + Ok(()) } fn try_erasure_recover( &self, erasure_meta: &ErasureMeta, slot: u64, - set_index: u64, - ) -> Result<()> { - match erasure_meta.status() { + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + new_coding_blob: Option<(u64, &[u8])>, + ) -> Result, Vec)>> { + use crate::erasure::ERASURE_SET_SIZE; + + let blobs = match erasure_meta.status() { ErasureMetaStatus::CanRecover => { - let recovered = self.recover(slot, set_index)?; - inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered); + let erasure_result = self.recover( + slot, + erasure_meta, + prev_inserted_blob_datas, + new_coding_blob, + ); + + match erasure_result { + Ok((data, coding)) => { + let recovered = data.len() + coding.len(); + assert_eq!( + ERASURE_SET_SIZE, + recovered + + (erasure_meta.coding.count_ones() + + erasure_meta.data.count_ones()) + as usize, + "Recovery should always complete a set" + ); + + info!("[try_erasure] recovered {} blobs", recovered); + inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered); + Some((data, coding)) + } + Err(Error::ErasureError(e)) => { + inc_new_counter_info!("blocktree-erasure-recovery_failed", 1); + error!( + "[try_erasure] recovery failed: slot: {}, set_index: {}, cause: {}", + slot, erasure_meta.set_index, e + ); + None + } + + Err(e) => return Err(e), + } } ErasureMetaStatus::StillNeed(needed) => { - inc_new_counter_info!("blocktree-erasure-blobs_needed", needed) + inc_new_counter_info!("blocktree-erasure-blobs_needed", needed); + None + } + ErasureMetaStatus::DataFull => { + inc_new_counter_info!("blocktree-erasure-complete", 1); + None + } + }; + + Ok(blobs) + } + + fn insert_data_blob_batch<'a, I>( + &self, + new_blobs: I, + slot_meta_working_set: &mut HashMap>, Option)>, + erasure_meta_working_set: &mut HashMap<(u64, u64), ErasureMeta>, + prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, + write_batch: &mut WriteBatch, + ) -> Result<()> + where + I: IntoIterator, + { + for blob in new_blobs.into_iter() { + let inserted = self.check_insert_data_blob( + blob, + slot_meta_working_set, + prev_inserted_blob_datas, + write_batch, + ); + + if inserted { + erasure_meta_working_set + .get_mut(&(blob.slot(), ErasureMeta::set_index_for(blob.index()))) + .unwrap() + .set_data_present(blob.index(), true); } - ErasureMetaStatus::DataFull => inc_new_counter_info!("blocktree-erasure-complete", 1), } + Ok(()) } @@ -977,6 +1066,54 @@ impl Blocktree { } } + /// Checks to see if the data blob passes integrity checks for insertion. Proceeds with + /// insertion if it does. + fn check_insert_data_blob<'a>( + &self, + blob: &'a Blob, + slot_meta_working_set: &mut HashMap>, Option)>, + prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, + write_batch: &mut WriteBatch, + ) -> bool { + let blob_slot = blob.slot(); + let parent_slot = blob.parent(); + + // Check if we've already inserted the slot metadata for this blob's slot + let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| { + // Store a 2-tuple of the metadata (working copy, backup copy) + if let Some(mut meta) = self + .meta(blob_slot) + .expect("Expect database get to succeed") + { + let backup = Some(meta.clone()); + // If parent_slot == std::u64::MAX, then this is one of the orphans inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details. Slots that are orphans are missing a parent_slot, so we should + // fill in the parent now that we know it. + if Blocktree::is_orphan(&meta) { + meta.parent_slot = parent_slot; + } + + (Rc::new(RefCell::new(meta)), backup) + } else { + ( + Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))), + None, + ) + } + }); + + let slot_meta = &mut entry.0.borrow_mut(); + + // This slot is full, skip the bogus blob + if slot_meta.is_full() { + false + } else { + let _ = self.insert_data_blob(blob, prev_inserted_blob_datas, slot_meta, write_batch); + true + } + } + /// Insert a blob into ledger, updating the slot_meta if necessary fn insert_data_blob<'a>( &self, @@ -1042,10 +1179,14 @@ impl Blocktree { } /// Attempts recovery using erasure coding - fn recover(&self, slot: u64, set_index: u64) -> Result { - use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA}; - - let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap(); + fn recover( + &self, + slot: u64, + erasure_meta: &ErasureMeta, + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + new_coding: Option<(u64, &[u8])>, + ) -> Result<(Vec, Vec)> { + use crate::erasure::ERASURE_SET_SIZE; let start_idx = erasure_meta.start_index(); let size = erasure_meta.size(); @@ -1057,16 +1198,19 @@ impl Blocktree { for i in start_idx..coding_end_idx { if erasure_meta.is_coding_present(i) { - let mut blob_bytes = self - .erasure_cf - .get_bytes((slot, i))? - .expect("erasure_meta must have no false positives"); + let mut blob_bytes = match new_coding { + Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(), + _ => self + .erasure_cf + .get_bytes((slot, i))? + .expect("ErasureMeta must have no false positives"), + }; blob_bytes.drain(..BLOB_HEADER_SIZE); blobs.push(blob_bytes); } else { - let set_relative_idx = (i - start_idx) as usize + NUM_DATA; + let set_relative_idx = erasure_meta.coding_index_in_set(i).unwrap() as usize; blobs.push(vec![0; size]); present[set_relative_idx] = false; } @@ -1075,13 +1219,16 @@ impl Blocktree { assert_ne!(size, 0); for i in start_idx..data_end_idx { - let set_relative_idx = (i - start_idx) as usize; + let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize; if erasure_meta.is_data_present(i) { - let mut blob_bytes = self - .data_cf - .get_bytes((slot, i))? - .expect("erasure_meta must have no false positives"); + let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) { + Some(bytes) => bytes.to_vec(), + None => self + .data_cf + .get_bytes((slot, i))? + .expect("erasure_meta must have no false positives"), + }; // If data is too short, extend it with zeroes blob_bytes.resize(size, 0u8); @@ -1098,8 +1245,6 @@ impl Blocktree { .session .reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; - let amount_recovered = recovered_data.len() + recovered_coding.len(); - trace!( "[recover] reconstruction OK slot: {}, indexes: [{},{})", slot, @@ -1107,13 +1252,7 @@ impl Blocktree { data_end_idx ); - self.write_blobs(recovered_data)?; - - for blob in recovered_coding { - self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?; - } - - Ok(amount_recovered) + Ok((recovered_data, recovered_coding)) } /// Returns the next consumed index and the number of ticks in the new consumed @@ -2559,6 +2698,7 @@ pub mod tests { mod erasure { use super::*; + use crate::blocktree::meta::ErasureMetaStatus; use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; use rand::{thread_rng, Rng}; @@ -2572,11 +2712,13 @@ pub mod tests { #[test] fn test_erasure_meta_accuracy() { + use ErasureMetaStatus::{DataFull, StillNeed}; + let path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&path).unwrap(); - // one erasure set + half of the next - let num_blobs = 24; + // two erasure sets + let num_blobs = 32; let slot = 0; let (blobs, _) = make_slot_entries(slot, 0, num_blobs); @@ -2596,8 +2738,7 @@ pub mod tests { assert!(erasure_meta_opt.is_some()); let erasure_meta = erasure_meta_opt.unwrap(); - assert_eq!(erasure_meta.data, 0xFF00); - assert_eq!(erasure_meta.coding, 0x0); + assert_eq!(erasure_meta.status(), StillNeed(8)); blocktree.write_blobs(&blobs[..8]).unwrap(); @@ -2609,18 +2750,9 @@ pub mod tests { assert_eq!(erasure_meta.data, 0xFFFF); assert_eq!(erasure_meta.coding, 0x0); + assert_eq!(erasure_meta.status(), DataFull); - blocktree.write_blobs(&blobs[16..]).unwrap(); - - let erasure_meta = blocktree - .erasure_meta_cf - .get((slot, 1)) - .expect("DB get must succeed") - .unwrap(); - - assert_eq!(erasure_meta.data, 0x00FF); - assert_eq!(erasure_meta.coding, 0x0); - + // insert all coding blobs in first set let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); @@ -2640,7 +2772,68 @@ pub mod tests { assert_eq!(erasure_meta.data, 0xFFFF); assert_eq!(erasure_meta.coding, 0x0F); + assert_eq!(erasure_meta.status(), DataFull); + // insert 8 of 16 data blobs in 2nd set + blocktree.write_blobs(&blobs[16..24]).unwrap(); + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 1)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.data, 0x00FF); + assert_eq!(erasure_meta.coding, 0x0); + assert_eq!(erasure_meta.status(), StillNeed(8)); + + // insert all coding blobs in 2nd set + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let coding_blobs = coding_generator.next(&shared_blobs[NUM_DATA..]); + + for shared_coding_blob in coding_blobs { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .unwrap(); + } + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 1)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.data, 0x00FF); + assert_eq!(erasure_meta.coding, 0x0F); + assert_eq!(erasure_meta.status(), StillNeed(4)); + + // insert 3 more data blobs in 2nd erasure set. + blocktree.write_blobs(&blobs[24..27]).unwrap(); + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 1)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.data, 0x07FF); + assert_eq!(erasure_meta.coding, 0x0F); + assert_eq!(erasure_meta.status(), StillNeed(1)); + + // insert 1 more data blob, should trigger erasure + blocktree.write_blobs(&blobs[28..29]).unwrap(); + + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, 1)) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.status(), DataFull); + + // remove coding blobs, erasure meta should still report being full let (start_idx, coding_end_idx) = (erasure_meta.start_index(), erasure_meta.end_indexes().1); @@ -2650,7 +2843,7 @@ pub mod tests { let erasure_meta = blocktree .erasure_meta_cf - .get((slot, 0)) + .get((slot, 1)) .expect("DB get must succeed") .unwrap(); @@ -2730,12 +2923,66 @@ pub mod tests { Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction"); } + #[test] + fn test_recovery_fails_safely() { + const SLOT: u64 = 0; + const SET_INDEX: u64 = 0; + + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&ledger_path).unwrap(); + let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64) + .0 + .into_iter() + .map(Blob::into) + .collect::>(); + + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + + let shared_coding_blobs = coding_generator.next(&data_blobs); + assert_eq!(shared_coding_blobs.len(), NUM_CODING); + + // Insert data blobs and coding. Not enough to do recovery + blocktree + .write_shared_blobs(&data_blobs[..NUM_DATA - 5]) + .unwrap(); + + for shared_blob in shared_coding_blobs { + let blob = shared_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + + blocktree + .put_coding_blob_bytes(SLOT, blob.index(), &blob.data[..size]) + .expect("Inserting coding blobs must succeed"); + } + + // try recovery even though there aren't enough blobs + let erasure_meta = blocktree + .erasure_meta_cf + .get((SLOT, SET_INDEX)) + .unwrap() + .unwrap(); + + assert_eq!(erasure_meta.status(), ErasureMetaStatus::StillNeed(1)); + + let prev_inserted_blob_datas = HashMap::new(); + + let attempt_result = + blocktree.try_erasure_recover(&erasure_meta, SLOT, &prev_inserted_blob_datas, None); + + assert!(attempt_result.is_ok()); + let recovered_blobs_opt = attempt_result.unwrap(); + + assert!(recovered_blobs_opt.is_none()); + } + #[test] fn test_recovery_multi_slot_multi_thread() { - use rand::rngs::SmallRng; - use rand::SeedableRng; + use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use std::thread; + const N_THREADS: usize = 3; + let slots = vec![0, 3, 5, 50, 100]; let max_erasure_sets = 16; solana_logger::setup(); @@ -2744,8 +2991,9 @@ pub mod tests { let mut rng = thread_rng(); // Specification should generate a ledger where each slot has an random number of - // erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones - // will have between 1 data blob missing and 1 coding blob + // erasure sets. Odd erasure sets will have all coding blobs and between 1-4 data blobs + // missing, and even ones will have between 1-2 data blobs missing and 1-2 coding blobs + // missing let specs = slots .iter() .map(|&slot| { @@ -2754,9 +3002,12 @@ pub mod tests { let set_specs = (0..num_erasure_sets) .map(|set_index| { let (num_data, num_coding) = if set_index % 2 == 0 { - (NUM_DATA - rng.gen_range(1, 5), NUM_CODING) + ( + NUM_DATA - rng.gen_range(1, 3), + NUM_CODING - rng.gen_range(1, 3), + ) } else { - (NUM_DATA - 1, NUM_CODING - 1) + (NUM_DATA - rng.gen_range(1, 5), NUM_CODING) }; ErasureSpec { set_index, @@ -2770,66 +3021,82 @@ pub mod tests { }) .collect::>(); - let model = generate_ledger_model(&specs); + let model = generate_ledger_model(specs); let blocktree = Arc::new(Blocktree::open(&path).unwrap()); // Write to each slot in a different thread simultaneously. // These writes should trigger the recovery. Every erasure set should have all of its - // data blobs + // data blobs and coding_blobs at the end let mut handles = vec![]; - for slot_model in model.clone() { + // Each thread will attempt to write to each slot in order. Within a slot, each thread + // will try to write each erasure set in a random order. Within each erasure set, there + // is a 50/50 chance of attempting to write the coding blobs first or the data blobs + // first. + // The goal is to be as racey as possible and cover a wide range of situations + for _ in 0..N_THREADS { let blocktree = Arc::clone(&blocktree); - let slot = slot_model.slot; let mut rng = SmallRng::from_rng(&mut rng).unwrap(); + let model = model.clone(); let handle = thread::spawn(move || { - for erasure_set in slot_model.chunks { - // for even sets, write data blobs first, then write coding blobs, which - // should trigger recovery since all coding blobs will be inserted and - // between 1-4 data blobs are missing - if rng.gen() { - blocktree - .write_shared_blobs(erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); + for slot_model in model { + let slot = slot_model.slot; + let num_erasure_sets = slot_model.chunks.len(); + let unordered_sets = slot_model + .chunks + .choose_multiple(&mut rng, num_erasure_sets); - for shared_coding_blob in erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; + for erasure_set in unordered_sets { + if rng.gen() { blocktree - .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - } else { - // for odd sets, write coding blobs first, then write the data blobs. - // writing the data blobs should trigger recovery, since 3/4 coding and - // 15/16 data blobs will be present - for shared_coding_blob in erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); + .write_shared_blobs(&erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); - blocktree - .write_shared_blobs(erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); + for shared_coding_blob in &erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes( + slot, + blob.index(), + &blob.data[..size], + ) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } else { + // write coding blobs first, then write the data blobs. + for shared_coding_blob in &erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes( + slot, + blob.index(), + &blob.data[..size], + ) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + + blocktree + .write_shared_blobs(&erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } } } }); @@ -2862,10 +3129,8 @@ pub mod tests { assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); // Should have all data assert_eq!(erasure_meta.data, 0xFFFF); - if set_index % 2 == 0 { - // Even sets have all coding - assert_eq!(erasure_meta.coding, 0x0F); - } + // Should have all coding + assert_eq!(erasure_meta.coding, 0x0F); } } diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 5a31ae984b..4764785e01 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -58,7 +58,7 @@ impl SlotMeta { } } -#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] /// Erasure coding information pub struct ErasureMeta { /// Which erasure set in the slot this is @@ -104,12 +104,7 @@ impl ErasureMeta { } pub fn is_coding_present(&self, index: u64) -> bool { - let start = self.start_index(); - let end = start + NUM_CODING as u64; - - if start <= index && index < end { - let position = index - start; - + if let Some(position) = self.data_index_in_set(index) { self.coding & (1 << position) != 0 } else { false @@ -125,11 +120,7 @@ impl ErasureMeta { } pub fn set_coding_present(&mut self, index: u64, present: bool) { - let set_index = Self::set_index_for(index); - - if set_index as u64 == self.set_index { - let position = index - self.start_index(); - + if let Some(position) = self.data_index_in_set(index) { if present { self.coding |= 1 << position; } else { @@ -139,12 +130,7 @@ impl ErasureMeta { } pub fn is_data_present(&self, index: u64) -> bool { - let start = self.start_index(); - let end = start + NUM_DATA as u64; - - if start <= index && index < end { - let position = index - start; - + if let Some(position) = self.data_index_in_set(index) { self.data & (1 << position) != 0 } else { false @@ -152,11 +138,7 @@ impl ErasureMeta { } pub fn set_data_present(&mut self, index: u64, present: bool) { - let set_index = Self::set_index_for(index); - - if set_index as u64 == self.set_index { - let position = index - self.start_index(); - + if let Some(position) = self.data_index_in_set(index) { if present { self.data |= 1 << position; } else { @@ -169,6 +151,20 @@ impl ErasureMeta { index / NUM_DATA as u64 } + pub fn data_index_in_set(&self, index: u64) -> Option { + let set_index = Self::set_index_for(index); + + if set_index == self.set_index { + Some(index - self.start_index()) + } else { + None + } + } + + pub fn coding_index_in_set(&self, index: u64) -> Option { + self.data_index_in_set(index).map(|i| i + NUM_DATA as u64) + } + pub fn start_index(&self) -> u64 { self.set_index * NUM_DATA as u64 } @@ -183,24 +179,42 @@ impl ErasureMeta { #[test] fn test_meta_indexes() { use rand::{thread_rng, Rng}; + // to avoid casts everywhere + const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64; let mut rng = thread_rng(); for _ in 0..100 { let set_index = rng.gen_range(0, 1_000); - let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16); + let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, 16); assert_eq!(set_index, ErasureMeta::set_index_for(blob_index)); let e_meta = ErasureMeta::new(set_index); - assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64); + assert_eq!(e_meta.start_index(), set_index * NUM_DATA); let (data_end_idx, coding_end_idx) = e_meta.end_indexes(); - assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64); - assert_eq!( - coding_end_idx, - set_index * NUM_DATA as u64 + NUM_CODING as u64 - ); + assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA); + assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64); } + + let mut e_meta = ErasureMeta::new(0); + + assert_eq!(e_meta.data_index_in_set(0), Some(0)); + assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2)); + assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1)); + assert_eq!(e_meta.data_index_in_set(NUM_DATA), None); + assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None); + + e_meta.set_index = 1; + + assert_eq!(e_meta.data_index_in_set(0), None); + assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None); + assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0)); + assert_eq!( + e_meta.data_index_in_set(NUM_DATA * 2 - 1), + Some(NUM_DATA - 1) + ); + assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None); } #[test] diff --git a/core/src/erasure.rs b/core/src/erasure.rs index fefa0b5217..473e73e4f6 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -41,7 +41,6 @@ //! //! use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; -use crate::result::Result; use std::cmp; use std::convert::AsMut; use std::sync::{Arc, RwLock}; @@ -56,6 +55,8 @@ pub const NUM_CODING: usize = 4; /// Total number of blobs in an erasure set; includes data and coding blobs pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; +type Result = std::result::Result; + /// Represents an erasure "session" with a particular configuration and number of data and coding /// blobs #[derive(Debug, Clone)]