Fix race in erasure metadata tracking (#3962)
* Fix erasure metadata race condition * make erasure return the underlying error without wrapping it in the `solana::Error` type * Add metric for erasure failures * add tests to `ErasureMeta` indexing logic * Add test to ensure erasure recovery failures don't cause panics
This commit is contained in:
parent
54b44977e0
commit
9a40ad76bd
|
@ -256,52 +256,21 @@ impl Blocktree {
|
|||
I::Item: Borrow<Blob>,
|
||||
{
|
||||
let mut write_batch = self.db.batch()?;
|
||||
let new_blobs: Vec<_> = new_blobs.into_iter().collect();
|
||||
let mut recovered_data = vec![];
|
||||
|
||||
let mut prev_inserted_blob_datas = HashMap::new();
|
||||
// A map from slot to a 2-tuple of metadata: (working copy, backup copy),
|
||||
// so we can detect changes to the slot metadata later
|
||||
let mut slot_meta_working_set = HashMap::new();
|
||||
let mut erasure_meta_working_set = HashMap::new();
|
||||
let new_blobs: Vec<_> = new_blobs.into_iter().collect();
|
||||
let mut prev_inserted_blob_datas = HashMap::new();
|
||||
|
||||
for blob in new_blobs.iter() {
|
||||
let blob = blob.borrow();
|
||||
let blob_slot = blob.slot();
|
||||
let parent_slot = blob.parent();
|
||||
|
||||
// Check if we've already inserted the slot metadata for this blob's slot
|
||||
let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| {
|
||||
// Store a 2-tuple of the metadata (working copy, backup copy)
|
||||
if let Some(mut meta) = self
|
||||
.meta(blob_slot)
|
||||
.expect("Expect database get to succeed")
|
||||
{
|
||||
let backup = Some(meta.clone());
|
||||
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
|
||||
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
||||
// for details. Slots that are orphans are missing a parent_slot, so we should
|
||||
// fill in the parent now that we know it.
|
||||
if Self::is_orphan(&meta) {
|
||||
meta.parent_slot = parent_slot;
|
||||
}
|
||||
|
||||
(Rc::new(RefCell::new(meta)), backup)
|
||||
} else {
|
||||
(
|
||||
Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
});
|
||||
|
||||
let slot_meta = &mut entry.0.borrow_mut();
|
||||
|
||||
// This slot is full, skip the bogus blob
|
||||
if slot_meta.is_full() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let set_index = ErasureMeta::set_index_for(blob.index());
|
||||
let erasure_meta_entry = erasure_meta_working_set
|
||||
erasure_meta_working_set
|
||||
.entry((blob_slot, set_index))
|
||||
.or_insert_with(|| {
|
||||
self.erasure_meta_cf
|
||||
|
@ -309,17 +278,43 @@ impl Blocktree {
|
|||
.expect("Expect database get to succeed")
|
||||
.unwrap_or_else(|| ErasureMeta::new(set_index))
|
||||
});
|
||||
|
||||
erasure_meta_entry.set_data_present(blob.index(), true);
|
||||
|
||||
let _ = self.insert_data_blob(
|
||||
blob,
|
||||
&mut prev_inserted_blob_datas,
|
||||
slot_meta,
|
||||
&mut write_batch,
|
||||
);
|
||||
}
|
||||
|
||||
self.insert_data_blob_batch(
|
||||
new_blobs.iter().map(Borrow::borrow),
|
||||
&mut slot_meta_working_set,
|
||||
&mut erasure_meta_working_set,
|
||||
&mut prev_inserted_blob_datas,
|
||||
&mut write_batch,
|
||||
)?;
|
||||
|
||||
for (&(slot, _), erasure_meta) in erasure_meta_working_set.iter_mut() {
|
||||
if let Some((data, coding)) =
|
||||
self.try_erasure_recover(&erasure_meta, slot, &prev_inserted_blob_datas, None)?
|
||||
{
|
||||
for data_blob in data {
|
||||
recovered_data.push(data_blob);
|
||||
}
|
||||
|
||||
for coding_blob in coding {
|
||||
erasure_meta.set_coding_present(coding_blob.index(), true);
|
||||
|
||||
write_batch.put_bytes::<cf::Coding>(
|
||||
(coding_blob.slot(), coding_blob.index()),
|
||||
&coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.insert_data_blob_batch(
|
||||
recovered_data.iter(),
|
||||
&mut slot_meta_working_set,
|
||||
&mut erasure_meta_working_set,
|
||||
&mut prev_inserted_blob_datas,
|
||||
&mut write_batch,
|
||||
)?;
|
||||
|
||||
// Handle chaining for the working set
|
||||
self.handle_chaining(&mut write_batch, &slot_meta_working_set)?;
|
||||
let mut should_signal = false;
|
||||
|
@ -335,21 +330,17 @@ impl Blocktree {
|
|||
}
|
||||
}
|
||||
|
||||
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() {
|
||||
write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?;
|
||||
for ((slot, set_index), erasure_meta) in erasure_meta_working_set {
|
||||
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
}
|
||||
|
||||
self.db.write(write_batch)?;
|
||||
|
||||
if should_signal {
|
||||
for signal in self.new_blobs_signals.iter() {
|
||||
let _ = signal.try_send(true);
|
||||
}
|
||||
}
|
||||
|
||||
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() {
|
||||
self.try_erasure_recover(&erasure_meta, slot, set_index)?;
|
||||
}
|
||||
self.db.write(write_batch)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -448,6 +439,8 @@ impl Blocktree {
|
|||
self.data_cf.put_bytes((slot, index), bytes)
|
||||
}
|
||||
|
||||
/// For benchmarks, testing, and setup.
|
||||
/// Does no metadata tracking. Use with care.
|
||||
pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.erasure_cf.put_bytes((slot, index), bytes)
|
||||
}
|
||||
|
@ -468,29 +461,125 @@ impl Blocktree {
|
|||
|
||||
writebatch.put_bytes::<cf::Coding>((slot, index), bytes)?;
|
||||
|
||||
if let Some((data, coding)) =
|
||||
self.try_erasure_recover(&erasure_meta, slot, &HashMap::new(), Some((index, bytes)))?
|
||||
{
|
||||
let mut erasure_meta_working_set = HashMap::new();
|
||||
erasure_meta_working_set.insert((slot, set_index), erasure_meta);
|
||||
|
||||
self.insert_data_blob_batch(
|
||||
&data[..],
|
||||
&mut HashMap::new(),
|
||||
&mut erasure_meta_working_set,
|
||||
&mut HashMap::new(),
|
||||
&mut writebatch,
|
||||
)?;
|
||||
|
||||
erasure_meta = *erasure_meta_working_set.values().next().unwrap();
|
||||
|
||||
for coding_blob in coding {
|
||||
erasure_meta.set_coding_present(coding_blob.index(), true);
|
||||
|
||||
writebatch.put_bytes::<cf::Coding>(
|
||||
(coding_blob.slot(), coding_blob.index()),
|
||||
&coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
writebatch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
|
||||
self.db.write(writebatch)?;
|
||||
|
||||
self.try_erasure_recover(&erasure_meta, slot, set_index)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_erasure_recover(
|
||||
&self,
|
||||
erasure_meta: &ErasureMeta,
|
||||
slot: u64,
|
||||
set_index: u64,
|
||||
) -> Result<()> {
|
||||
match erasure_meta.status() {
|
||||
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
|
||||
new_coding_blob: Option<(u64, &[u8])>,
|
||||
) -> Result<Option<(Vec<Blob>, Vec<Blob>)>> {
|
||||
use crate::erasure::ERASURE_SET_SIZE;
|
||||
|
||||
let blobs = match erasure_meta.status() {
|
||||
ErasureMetaStatus::CanRecover => {
|
||||
let recovered = self.recover(slot, set_index)?;
|
||||
let erasure_result = self.recover(
|
||||
slot,
|
||||
erasure_meta,
|
||||
prev_inserted_blob_datas,
|
||||
new_coding_blob,
|
||||
);
|
||||
|
||||
match erasure_result {
|
||||
Ok((data, coding)) => {
|
||||
let recovered = data.len() + coding.len();
|
||||
assert_eq!(
|
||||
ERASURE_SET_SIZE,
|
||||
recovered
|
||||
+ (erasure_meta.coding.count_ones()
|
||||
+ erasure_meta.data.count_ones())
|
||||
as usize,
|
||||
"Recovery should always complete a set"
|
||||
);
|
||||
|
||||
info!("[try_erasure] recovered {} blobs", recovered);
|
||||
inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered);
|
||||
Some((data, coding))
|
||||
}
|
||||
Err(Error::ErasureError(e)) => {
|
||||
inc_new_counter_info!("blocktree-erasure-recovery_failed", 1);
|
||||
error!(
|
||||
"[try_erasure] recovery failed: slot: {}, set_index: {}, cause: {}",
|
||||
slot, erasure_meta.set_index, e
|
||||
);
|
||||
None
|
||||
}
|
||||
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
ErasureMetaStatus::StillNeed(needed) => {
|
||||
inc_new_counter_info!("blocktree-erasure-blobs_needed", needed)
|
||||
inc_new_counter_info!("blocktree-erasure-blobs_needed", needed);
|
||||
None
|
||||
}
|
||||
ErasureMetaStatus::DataFull => inc_new_counter_info!("blocktree-erasure-complete", 1),
|
||||
ErasureMetaStatus::DataFull => {
|
||||
inc_new_counter_info!("blocktree-erasure-complete", 1);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
Ok(blobs)
|
||||
}
|
||||
|
||||
fn insert_data_blob_batch<'a, I>(
|
||||
&self,
|
||||
new_blobs: I,
|
||||
slot_meta_working_set: &mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
||||
erasure_meta_working_set: &mut HashMap<(u64, u64), ErasureMeta>,
|
||||
prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>,
|
||||
write_batch: &mut WriteBatch,
|
||||
) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = &'a Blob>,
|
||||
{
|
||||
for blob in new_blobs.into_iter() {
|
||||
let inserted = self.check_insert_data_blob(
|
||||
blob,
|
||||
slot_meta_working_set,
|
||||
prev_inserted_blob_datas,
|
||||
write_batch,
|
||||
);
|
||||
|
||||
if inserted {
|
||||
erasure_meta_working_set
|
||||
.get_mut(&(blob.slot(), ErasureMeta::set_index_for(blob.index())))
|
||||
.unwrap()
|
||||
.set_data_present(blob.index(), true);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -977,6 +1066,54 @@ impl Blocktree {
|
|||
}
|
||||
}
|
||||
|
||||
/// Checks to see if the data blob passes integrity checks for insertion. Proceeds with
|
||||
/// insertion if it does.
|
||||
fn check_insert_data_blob<'a>(
|
||||
&self,
|
||||
blob: &'a Blob,
|
||||
slot_meta_working_set: &mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
||||
prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>,
|
||||
write_batch: &mut WriteBatch,
|
||||
) -> bool {
|
||||
let blob_slot = blob.slot();
|
||||
let parent_slot = blob.parent();
|
||||
|
||||
// Check if we've already inserted the slot metadata for this blob's slot
|
||||
let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| {
|
||||
// Store a 2-tuple of the metadata (working copy, backup copy)
|
||||
if let Some(mut meta) = self
|
||||
.meta(blob_slot)
|
||||
.expect("Expect database get to succeed")
|
||||
{
|
||||
let backup = Some(meta.clone());
|
||||
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
|
||||
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
||||
// for details. Slots that are orphans are missing a parent_slot, so we should
|
||||
// fill in the parent now that we know it.
|
||||
if Blocktree::is_orphan(&meta) {
|
||||
meta.parent_slot = parent_slot;
|
||||
}
|
||||
|
||||
(Rc::new(RefCell::new(meta)), backup)
|
||||
} else {
|
||||
(
|
||||
Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
});
|
||||
|
||||
let slot_meta = &mut entry.0.borrow_mut();
|
||||
|
||||
// This slot is full, skip the bogus blob
|
||||
if slot_meta.is_full() {
|
||||
false
|
||||
} else {
|
||||
let _ = self.insert_data_blob(blob, prev_inserted_blob_datas, slot_meta, write_batch);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a blob into ledger, updating the slot_meta if necessary
|
||||
fn insert_data_blob<'a>(
|
||||
&self,
|
||||
|
@ -1042,10 +1179,14 @@ impl Blocktree {
|
|||
}
|
||||
|
||||
/// Attempts recovery using erasure coding
|
||||
fn recover(&self, slot: u64, set_index: u64) -> Result<usize> {
|
||||
use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA};
|
||||
|
||||
let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap();
|
||||
fn recover(
|
||||
&self,
|
||||
slot: u64,
|
||||
erasure_meta: &ErasureMeta,
|
||||
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
|
||||
new_coding: Option<(u64, &[u8])>,
|
||||
) -> Result<(Vec<Blob>, Vec<Blob>)> {
|
||||
use crate::erasure::ERASURE_SET_SIZE;
|
||||
|
||||
let start_idx = erasure_meta.start_index();
|
||||
let size = erasure_meta.size();
|
||||
|
@ -1057,16 +1198,19 @@ impl Blocktree {
|
|||
|
||||
for i in start_idx..coding_end_idx {
|
||||
if erasure_meta.is_coding_present(i) {
|
||||
let mut blob_bytes = self
|
||||
let mut blob_bytes = match new_coding {
|
||||
Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(),
|
||||
_ => self
|
||||
.erasure_cf
|
||||
.get_bytes((slot, i))?
|
||||
.expect("erasure_meta must have no false positives");
|
||||
.expect("ErasureMeta must have no false positives"),
|
||||
};
|
||||
|
||||
blob_bytes.drain(..BLOB_HEADER_SIZE);
|
||||
|
||||
blobs.push(blob_bytes);
|
||||
} else {
|
||||
let set_relative_idx = (i - start_idx) as usize + NUM_DATA;
|
||||
let set_relative_idx = erasure_meta.coding_index_in_set(i).unwrap() as usize;
|
||||
blobs.push(vec![0; size]);
|
||||
present[set_relative_idx] = false;
|
||||
}
|
||||
|
@ -1075,13 +1219,16 @@ impl Blocktree {
|
|||
assert_ne!(size, 0);
|
||||
|
||||
for i in start_idx..data_end_idx {
|
||||
let set_relative_idx = (i - start_idx) as usize;
|
||||
let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize;
|
||||
|
||||
if erasure_meta.is_data_present(i) {
|
||||
let mut blob_bytes = self
|
||||
let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) {
|
||||
Some(bytes) => bytes.to_vec(),
|
||||
None => self
|
||||
.data_cf
|
||||
.get_bytes((slot, i))?
|
||||
.expect("erasure_meta must have no false positives");
|
||||
.expect("erasure_meta must have no false positives"),
|
||||
};
|
||||
|
||||
// If data is too short, extend it with zeroes
|
||||
blob_bytes.resize(size, 0u8);
|
||||
|
@ -1098,8 +1245,6 @@ impl Blocktree {
|
|||
.session
|
||||
.reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?;
|
||||
|
||||
let amount_recovered = recovered_data.len() + recovered_coding.len();
|
||||
|
||||
trace!(
|
||||
"[recover] reconstruction OK slot: {}, indexes: [{},{})",
|
||||
slot,
|
||||
|
@ -1107,13 +1252,7 @@ impl Blocktree {
|
|||
data_end_idx
|
||||
);
|
||||
|
||||
self.write_blobs(recovered_data)?;
|
||||
|
||||
for blob in recovered_coding {
|
||||
self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?;
|
||||
}
|
||||
|
||||
Ok(amount_recovered)
|
||||
Ok((recovered_data, recovered_coding))
|
||||
}
|
||||
|
||||
/// Returns the next consumed index and the number of ticks in the new consumed
|
||||
|
@ -2559,6 +2698,7 @@ pub mod tests {
|
|||
|
||||
mod erasure {
|
||||
use super::*;
|
||||
use crate::blocktree::meta::ErasureMetaStatus;
|
||||
use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec};
|
||||
use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
@ -2572,11 +2712,13 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_erasure_meta_accuracy() {
|
||||
use ErasureMetaStatus::{DataFull, StillNeed};
|
||||
|
||||
let path = get_tmp_ledger_path!();
|
||||
let blocktree = Blocktree::open(&path).unwrap();
|
||||
|
||||
// one erasure set + half of the next
|
||||
let num_blobs = 24;
|
||||
// two erasure sets
|
||||
let num_blobs = 32;
|
||||
let slot = 0;
|
||||
|
||||
let (blobs, _) = make_slot_entries(slot, 0, num_blobs);
|
||||
|
@ -2596,8 +2738,7 @@ pub mod tests {
|
|||
assert!(erasure_meta_opt.is_some());
|
||||
let erasure_meta = erasure_meta_opt.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.data, 0xFF00);
|
||||
assert_eq!(erasure_meta.coding, 0x0);
|
||||
assert_eq!(erasure_meta.status(), StillNeed(8));
|
||||
|
||||
blocktree.write_blobs(&blobs[..8]).unwrap();
|
||||
|
||||
|
@ -2609,18 +2750,9 @@ pub mod tests {
|
|||
|
||||
assert_eq!(erasure_meta.data, 0xFFFF);
|
||||
assert_eq!(erasure_meta.coding, 0x0);
|
||||
assert_eq!(erasure_meta.status(), DataFull);
|
||||
|
||||
blocktree.write_blobs(&blobs[16..]).unwrap();
|
||||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 1))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.data, 0x00FF);
|
||||
assert_eq!(erasure_meta.coding, 0x0);
|
||||
|
||||
// insert all coding blobs in first set
|
||||
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
|
||||
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]);
|
||||
|
||||
|
@ -2640,7 +2772,68 @@ pub mod tests {
|
|||
|
||||
assert_eq!(erasure_meta.data, 0xFFFF);
|
||||
assert_eq!(erasure_meta.coding, 0x0F);
|
||||
assert_eq!(erasure_meta.status(), DataFull);
|
||||
|
||||
// insert 8 of 16 data blobs in 2nd set
|
||||
blocktree.write_blobs(&blobs[16..24]).unwrap();
|
||||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 1))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.data, 0x00FF);
|
||||
assert_eq!(erasure_meta.coding, 0x0);
|
||||
assert_eq!(erasure_meta.status(), StillNeed(8));
|
||||
|
||||
// insert all coding blobs in 2nd set
|
||||
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
|
||||
let coding_blobs = coding_generator.next(&shared_blobs[NUM_DATA..]);
|
||||
|
||||
for shared_coding_blob in coding_blobs {
|
||||
let blob = shared_coding_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
blocktree
|
||||
.put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 1))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.data, 0x00FF);
|
||||
assert_eq!(erasure_meta.coding, 0x0F);
|
||||
assert_eq!(erasure_meta.status(), StillNeed(4));
|
||||
|
||||
// insert 3 more data blobs in 2nd erasure set.
|
||||
blocktree.write_blobs(&blobs[24..27]).unwrap();
|
||||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 1))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.data, 0x07FF);
|
||||
assert_eq!(erasure_meta.coding, 0x0F);
|
||||
assert_eq!(erasure_meta.status(), StillNeed(1));
|
||||
|
||||
// insert 1 more data blob, should trigger erasure
|
||||
blocktree.write_blobs(&blobs[28..29]).unwrap();
|
||||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 1))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.status(), DataFull);
|
||||
|
||||
// remove coding blobs, erasure meta should still report being full
|
||||
let (start_idx, coding_end_idx) =
|
||||
(erasure_meta.start_index(), erasure_meta.end_indexes().1);
|
||||
|
||||
|
@ -2650,7 +2843,7 @@ pub mod tests {
|
|||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 0))
|
||||
.get((slot, 1))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
|
@ -2730,12 +2923,66 @@ pub mod tests {
|
|||
Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recovery_fails_safely() {
|
||||
const SLOT: u64 = 0;
|
||||
const SET_INDEX: u64 = 0;
|
||||
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||
let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64)
|
||||
.0
|
||||
.into_iter()
|
||||
.map(Blob::into)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
|
||||
|
||||
let shared_coding_blobs = coding_generator.next(&data_blobs);
|
||||
assert_eq!(shared_coding_blobs.len(), NUM_CODING);
|
||||
|
||||
// Insert data blobs and coding. Not enough to do recovery
|
||||
blocktree
|
||||
.write_shared_blobs(&data_blobs[..NUM_DATA - 5])
|
||||
.unwrap();
|
||||
|
||||
for shared_blob in shared_coding_blobs {
|
||||
let blob = shared_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
|
||||
blocktree
|
||||
.put_coding_blob_bytes(SLOT, blob.index(), &blob.data[..size])
|
||||
.expect("Inserting coding blobs must succeed");
|
||||
}
|
||||
|
||||
// try recovery even though there aren't enough blobs
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((SLOT, SET_INDEX))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.status(), ErasureMetaStatus::StillNeed(1));
|
||||
|
||||
let prev_inserted_blob_datas = HashMap::new();
|
||||
|
||||
let attempt_result =
|
||||
blocktree.try_erasure_recover(&erasure_meta, SLOT, &prev_inserted_blob_datas, None);
|
||||
|
||||
assert!(attempt_result.is_ok());
|
||||
let recovered_blobs_opt = attempt_result.unwrap();
|
||||
|
||||
assert!(recovered_blobs_opt.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recovery_multi_slot_multi_thread() {
|
||||
use rand::rngs::SmallRng;
|
||||
use rand::SeedableRng;
|
||||
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
|
||||
use std::thread;
|
||||
|
||||
const N_THREADS: usize = 3;
|
||||
|
||||
let slots = vec![0, 3, 5, 50, 100];
|
||||
let max_erasure_sets = 16;
|
||||
solana_logger::setup();
|
||||
|
@ -2744,8 +2991,9 @@ pub mod tests {
|
|||
let mut rng = thread_rng();
|
||||
|
||||
// Specification should generate a ledger where each slot has an random number of
|
||||
// erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones
|
||||
// will have between 1 data blob missing and 1 coding blob
|
||||
// erasure sets. Odd erasure sets will have all coding blobs and between 1-4 data blobs
|
||||
// missing, and even ones will have between 1-2 data blobs missing and 1-2 coding blobs
|
||||
// missing
|
||||
let specs = slots
|
||||
.iter()
|
||||
.map(|&slot| {
|
||||
|
@ -2754,9 +3002,12 @@ pub mod tests {
|
|||
let set_specs = (0..num_erasure_sets)
|
||||
.map(|set_index| {
|
||||
let (num_data, num_coding) = if set_index % 2 == 0 {
|
||||
(NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
|
||||
(
|
||||
NUM_DATA - rng.gen_range(1, 3),
|
||||
NUM_CODING - rng.gen_range(1, 3),
|
||||
)
|
||||
} else {
|
||||
(NUM_DATA - 1, NUM_CODING - 1)
|
||||
(NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
|
||||
};
|
||||
ErasureSpec {
|
||||
set_index,
|
||||
|
@ -2770,37 +3021,50 @@ pub mod tests {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let model = generate_ledger_model(&specs);
|
||||
let model = generate_ledger_model(specs);
|
||||
let blocktree = Arc::new(Blocktree::open(&path).unwrap());
|
||||
|
||||
// Write to each slot in a different thread simultaneously.
|
||||
// These writes should trigger the recovery. Every erasure set should have all of its
|
||||
// data blobs
|
||||
// data blobs and coding_blobs at the end
|
||||
let mut handles = vec![];
|
||||
|
||||
for slot_model in model.clone() {
|
||||
// Each thread will attempt to write to each slot in order. Within a slot, each thread
|
||||
// will try to write each erasure set in a random order. Within each erasure set, there
|
||||
// is a 50/50 chance of attempting to write the coding blobs first or the data blobs
|
||||
// first.
|
||||
// The goal is to be as racey as possible and cover a wide range of situations
|
||||
for _ in 0..N_THREADS {
|
||||
let blocktree = Arc::clone(&blocktree);
|
||||
let slot = slot_model.slot;
|
||||
let mut rng = SmallRng::from_rng(&mut rng).unwrap();
|
||||
let model = model.clone();
|
||||
let handle = thread::spawn(move || {
|
||||
for erasure_set in slot_model.chunks {
|
||||
// for even sets, write data blobs first, then write coding blobs, which
|
||||
// should trigger recovery since all coding blobs will be inserted and
|
||||
// between 1-4 data blobs are missing
|
||||
for slot_model in model {
|
||||
let slot = slot_model.slot;
|
||||
let num_erasure_sets = slot_model.chunks.len();
|
||||
let unordered_sets = slot_model
|
||||
.chunks
|
||||
.choose_multiple(&mut rng, num_erasure_sets);
|
||||
|
||||
for erasure_set in unordered_sets {
|
||||
if rng.gen() {
|
||||
blocktree
|
||||
.write_shared_blobs(erasure_set.data)
|
||||
.write_shared_blobs(&erasure_set.data)
|
||||
.expect("Writing data blobs must succeed");
|
||||
debug!(
|
||||
"multislot: wrote data: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
|
||||
for shared_coding_blob in erasure_set.coding {
|
||||
for shared_coding_blob in &erasure_set.coding {
|
||||
let blob = shared_coding_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
blocktree
|
||||
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
|
||||
.put_coding_blob_bytes(
|
||||
slot,
|
||||
blob.index(),
|
||||
&blob.data[..size],
|
||||
)
|
||||
.expect("Writing coding blobs must succeed");
|
||||
}
|
||||
debug!(
|
||||
|
@ -2808,14 +3072,16 @@ pub mod tests {
|
|||
slot, erasure_set.set_index
|
||||
);
|
||||
} else {
|
||||
// for odd sets, write coding blobs first, then write the data blobs.
|
||||
// writing the data blobs should trigger recovery, since 3/4 coding and
|
||||
// 15/16 data blobs will be present
|
||||
for shared_coding_blob in erasure_set.coding {
|
||||
// write coding blobs first, then write the data blobs.
|
||||
for shared_coding_blob in &erasure_set.coding {
|
||||
let blob = shared_coding_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
blocktree
|
||||
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
|
||||
.put_coding_blob_bytes(
|
||||
slot,
|
||||
blob.index(),
|
||||
&blob.data[..size],
|
||||
)
|
||||
.expect("Writing coding blobs must succeed");
|
||||
}
|
||||
debug!(
|
||||
|
@ -2824,7 +3090,7 @@ pub mod tests {
|
|||
);
|
||||
|
||||
blocktree
|
||||
.write_shared_blobs(erasure_set.data)
|
||||
.write_shared_blobs(&erasure_set.data)
|
||||
.expect("Writing data blobs must succeed");
|
||||
debug!(
|
||||
"multislot: wrote data: slot: {}, erasure_set: {}",
|
||||
|
@ -2832,6 +3098,7 @@ pub mod tests {
|
|||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
|
@ -2862,12 +3129,10 @@ pub mod tests {
|
|||
assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull);
|
||||
// Should have all data
|
||||
assert_eq!(erasure_meta.data, 0xFFFF);
|
||||
if set_index % 2 == 0 {
|
||||
// Even sets have all coding
|
||||
// Should have all coding
|
||||
assert_eq!(erasure_meta.coding, 0x0F);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&path).expect("Blocktree destruction must succeed");
|
||||
|
|
|
@ -58,7 +58,7 @@ impl SlotMeta {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
/// Erasure coding information
|
||||
pub struct ErasureMeta {
|
||||
/// Which erasure set in the slot this is
|
||||
|
@ -104,12 +104,7 @@ impl ErasureMeta {
|
|||
}
|
||||
|
||||
pub fn is_coding_present(&self, index: u64) -> bool {
|
||||
let start = self.start_index();
|
||||
let end = start + NUM_CODING as u64;
|
||||
|
||||
if start <= index && index < end {
|
||||
let position = index - start;
|
||||
|
||||
if let Some(position) = self.data_index_in_set(index) {
|
||||
self.coding & (1 << position) != 0
|
||||
} else {
|
||||
false
|
||||
|
@ -125,11 +120,7 @@ impl ErasureMeta {
|
|||
}
|
||||
|
||||
pub fn set_coding_present(&mut self, index: u64, present: bool) {
|
||||
let set_index = Self::set_index_for(index);
|
||||
|
||||
if set_index as u64 == self.set_index {
|
||||
let position = index - self.start_index();
|
||||
|
||||
if let Some(position) = self.data_index_in_set(index) {
|
||||
if present {
|
||||
self.coding |= 1 << position;
|
||||
} else {
|
||||
|
@ -139,12 +130,7 @@ impl ErasureMeta {
|
|||
}
|
||||
|
||||
pub fn is_data_present(&self, index: u64) -> bool {
|
||||
let start = self.start_index();
|
||||
let end = start + NUM_DATA as u64;
|
||||
|
||||
if start <= index && index < end {
|
||||
let position = index - start;
|
||||
|
||||
if let Some(position) = self.data_index_in_set(index) {
|
||||
self.data & (1 << position) != 0
|
||||
} else {
|
||||
false
|
||||
|
@ -152,11 +138,7 @@ impl ErasureMeta {
|
|||
}
|
||||
|
||||
pub fn set_data_present(&mut self, index: u64, present: bool) {
|
||||
let set_index = Self::set_index_for(index);
|
||||
|
||||
if set_index as u64 == self.set_index {
|
||||
let position = index - self.start_index();
|
||||
|
||||
if let Some(position) = self.data_index_in_set(index) {
|
||||
if present {
|
||||
self.data |= 1 << position;
|
||||
} else {
|
||||
|
@ -169,6 +151,20 @@ impl ErasureMeta {
|
|||
index / NUM_DATA as u64
|
||||
}
|
||||
|
||||
pub fn data_index_in_set(&self, index: u64) -> Option<u64> {
|
||||
let set_index = Self::set_index_for(index);
|
||||
|
||||
if set_index == self.set_index {
|
||||
Some(index - self.start_index())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn coding_index_in_set(&self, index: u64) -> Option<u64> {
|
||||
self.data_index_in_set(index).map(|i| i + NUM_DATA as u64)
|
||||
}
|
||||
|
||||
pub fn start_index(&self) -> u64 {
|
||||
self.set_index * NUM_DATA as u64
|
||||
}
|
||||
|
@ -183,24 +179,42 @@ impl ErasureMeta {
|
|||
#[test]
|
||||
fn test_meta_indexes() {
|
||||
use rand::{thread_rng, Rng};
|
||||
// to avoid casts everywhere
|
||||
const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64;
|
||||
|
||||
let mut rng = thread_rng();
|
||||
|
||||
for _ in 0..100 {
|
||||
let set_index = rng.gen_range(0, 1_000);
|
||||
let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16);
|
||||
let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, 16);
|
||||
|
||||
assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
|
||||
let e_meta = ErasureMeta::new(set_index);
|
||||
|
||||
assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64);
|
||||
assert_eq!(e_meta.start_index(), set_index * NUM_DATA);
|
||||
let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
|
||||
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64);
|
||||
assert_eq!(
|
||||
coding_end_idx,
|
||||
set_index * NUM_DATA as u64 + NUM_CODING as u64
|
||||
);
|
||||
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA);
|
||||
assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64);
|
||||
}
|
||||
|
||||
let mut e_meta = ErasureMeta::new(0);
|
||||
|
||||
assert_eq!(e_meta.data_index_in_set(0), Some(0));
|
||||
assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2));
|
||||
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1));
|
||||
assert_eq!(e_meta.data_index_in_set(NUM_DATA), None);
|
||||
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
|
||||
|
||||
e_meta.set_index = 1;
|
||||
|
||||
assert_eq!(e_meta.data_index_in_set(0), None);
|
||||
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None);
|
||||
assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0));
|
||||
assert_eq!(
|
||||
e_meta.data_index_in_set(NUM_DATA * 2 - 1),
|
||||
Some(NUM_DATA - 1)
|
||||
);
|
||||
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -41,7 +41,6 @@
|
|||
//!
|
||||
//!
|
||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
||||
use crate::result::Result;
|
||||
use std::cmp;
|
||||
use std::convert::AsMut;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
@ -56,6 +55,8 @@ pub const NUM_CODING: usize = 4;
|
|||
/// Total number of blobs in an erasure set; includes data and coding blobs
|
||||
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
|
||||
|
||||
type Result<T> = std::result::Result<T, reed_solomon_erasure::Error>;
|
||||
|
||||
/// Represents an erasure "session" with a particular configuration and number of data and coding
|
||||
/// blobs
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
Loading…
Reference in New Issue