encapsulate erasure_cf (#2349)
This commit is contained in:
parent
78d3b83900
commit
b648f37b97
|
@ -277,7 +277,7 @@ pub struct DbLedger {
|
|||
db: Arc<DB>,
|
||||
meta_cf: MetaCf,
|
||||
data_cf: DataCf,
|
||||
pub erasure_cf: ErasureCf,
|
||||
erasure_cf: ErasureCf,
|
||||
}
|
||||
|
||||
// TODO: Once we support a window that knows about different leader
|
||||
|
|
|
@ -198,12 +198,12 @@ pub fn process_blob(
|
|||
|
||||
// Insert the new blob into the window
|
||||
let mut consumed_entries = if is_coding {
|
||||
let erasure_key = ErasureCf::key(slot, pix);
|
||||
let rblob = &blob.read().unwrap();
|
||||
let size = rblob.size()?;
|
||||
db_ledger
|
||||
.erasure_cf
|
||||
.put(&erasure_key, &rblob.data[..BLOB_HEADER_SIZE + size])?;
|
||||
let blob = &blob.read().unwrap();
|
||||
db_ledger.put_coding_blob_bytes(
|
||||
slot,
|
||||
pix,
|
||||
&blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()],
|
||||
)?;
|
||||
vec![]
|
||||
} else {
|
||||
db_ledger.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?
|
||||
|
@ -277,15 +277,12 @@ fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Res
|
|||
if let Some(meta) = meta {
|
||||
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
|
||||
for c in coding {
|
||||
let cl = c.read().unwrap();
|
||||
let erasure_key = ErasureCf::key(
|
||||
let c = c.read().unwrap();
|
||||
db_ledger.put_coding_blob_bytes(
|
||||
meta.consumed_slot,
|
||||
cl.index().expect("Recovered blob must set index"),
|
||||
);
|
||||
let size = cl.size().expect("Recovered blob must set size");
|
||||
db_ledger
|
||||
.erasure_cf
|
||||
.put(&erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?;
|
||||
c.index().unwrap(),
|
||||
&c.data[..BLOB_HEADER_SIZE + c.size().unwrap()],
|
||||
)?;
|
||||
}
|
||||
|
||||
let entries = db_ledger.write_shared_blobs(data)?;
|
||||
|
@ -600,13 +597,13 @@ mod test {
|
|||
// Test erasing a data block and an erasure block
|
||||
let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING);
|
||||
|
||||
let erase_offset = coding_start % window.len();
|
||||
let erased_index = coding_start % window.len();
|
||||
|
||||
// Create a hole in the window
|
||||
let erased_data = window[erase_offset].data.clone();
|
||||
let erased_coding = window[erase_offset].coding.clone().unwrap();
|
||||
window[erase_offset].data = None;
|
||||
window[erase_offset].coding = None;
|
||||
let erased_data = window[erased_index].data.clone();
|
||||
let erased_coding = window[erased_index].coding.clone().unwrap();
|
||||
window[erased_index].data = None;
|
||||
window[erased_index].coding = None;
|
||||
|
||||
// Generate the db_ledger from the window
|
||||
let ledger_path = get_tmp_ledger_path("test_try_erasure");
|
||||
|
@ -614,10 +611,10 @@ mod test {
|
|||
|
||||
let mut consume_queue = vec![];
|
||||
try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt");
|
||||
window[erase_offset].data = erased_data;
|
||||
window[erased_index].data = erased_data;
|
||||
|
||||
{
|
||||
let data_blobs: Vec<_> = window[erase_offset..end_index]
|
||||
let data_blobs: Vec<_> = window[erased_index..end_index]
|
||||
.iter()
|
||||
.map(|slot| slot.data.clone().unwrap())
|
||||
.collect();
|
||||
|
@ -633,8 +630,7 @@ mod test {
|
|||
let erased_coding_l = erased_coding.read().unwrap();
|
||||
assert_eq!(
|
||||
&db_ledger
|
||||
.erasure_cf
|
||||
.get_by_slot_index(slot_height, erase_offset as u64)
|
||||
.get_coding_blob_bytes(slot_height, erased_index as u64)
|
||||
.unwrap()
|
||||
.unwrap()[BLOB_HEADER_SIZE..],
|
||||
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],
|
||||
|
|
|
@ -418,7 +418,7 @@ pub fn recover(
|
|||
|
||||
// Add the coding blobs we have into the recovery vector, mark the missing ones
|
||||
for i in coding_start_idx..block_end_idx {
|
||||
let result = db_ledger.erasure_cf.get_by_slot_index(slot, i)?;
|
||||
let result = db_ledger.get_coding_blob_bytes(slot, i)?;
|
||||
|
||||
categorize_blob(
|
||||
&result,
|
||||
|
@ -508,10 +508,10 @@ pub fn recover(
|
|||
}
|
||||
|
||||
if corrupt {
|
||||
// Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct
|
||||
// the blobs again
|
||||
// Remove the corrupted coding blobs so there's no effort wasted in trying to
|
||||
// reconstruct the blobs again
|
||||
for i in coding_start_idx..block_end_idx {
|
||||
db_ledger.erasure_cf.delete_by_slot_index(slot, i)?;
|
||||
db_ledger.delete_coding_blob(slot, i)?;
|
||||
}
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
@ -650,8 +650,7 @@ pub mod test {
|
|||
.expect("Expected coding blob to have valid data size");
|
||||
|
||||
db_ledger
|
||||
.erasure_cf
|
||||
.put_by_slot_index(
|
||||
.put_coding_blob_bytes(
|
||||
coding_lock.slot().unwrap(),
|
||||
index,
|
||||
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
|
||||
|
|
Loading…
Reference in New Issue