From b648f37b9701b676bd444cdcf40cd73da737b465 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Wed, 9 Jan 2019 10:21:55 -0800 Subject: [PATCH] encapsulate erasure_cf (#2349) --- src/db_ledger.rs | 2 +- src/db_window.rs | 42 +++++++++++++++++++----------------------- src/erasure.rs | 11 +++++------ 3 files changed, 25 insertions(+), 30 deletions(-) diff --git a/src/db_ledger.rs b/src/db_ledger.rs index bd562af50e..ab4f903170 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -277,7 +277,7 @@ pub struct DbLedger { db: Arc, meta_cf: MetaCf, data_cf: DataCf, - pub erasure_cf: ErasureCf, + erasure_cf: ErasureCf, } // TODO: Once we support a window that knows about different leader diff --git a/src/db_window.rs b/src/db_window.rs index c1cae642c5..e2a2f16609 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -198,12 +198,12 @@ pub fn process_blob( // Insert the new blob into the window let mut consumed_entries = if is_coding { - let erasure_key = ErasureCf::key(slot, pix); - let rblob = &blob.read().unwrap(); - let size = rblob.size()?; - db_ledger - .erasure_cf - .put(&erasure_key, &rblob.data[..BLOB_HEADER_SIZE + size])?; + let blob = &blob.read().unwrap(); + db_ledger.put_coding_blob_bytes( + slot, + pix, + &blob.data[..BLOB_HEADER_SIZE + blob.size().unwrap()], + )?; vec![] } else { db_ledger.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])? @@ -277,15 +277,12 @@ fn try_erasure(db_ledger: &Arc, consume_queue: &mut Vec) -> Res if let Some(meta) = meta { let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; for c in coding { - let cl = c.read().unwrap(); - let erasure_key = ErasureCf::key( + let c = c.read().unwrap(); + db_ledger.put_coding_blob_bytes( meta.consumed_slot, - cl.index().expect("Recovered blob must set index"), - ); - let size = cl.size().expect("Recovered blob must set size"); - db_ledger - .erasure_cf - .put(&erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; + c.index().unwrap(), + &c.data[..BLOB_HEADER_SIZE + c.size().unwrap()], + )?; } let entries = db_ledger.write_shared_blobs(data)?; @@ -600,13 +597,13 @@ mod test { // Test erasing a data block and an erasure block let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING); - let erase_offset = coding_start % window.len(); + let erased_index = coding_start % window.len(); // Create a hole in the window - let erased_data = window[erase_offset].data.clone(); - let erased_coding = window[erase_offset].coding.clone().unwrap(); - window[erase_offset].data = None; - window[erase_offset].coding = None; + let erased_data = window[erased_index].data.clone(); + let erased_coding = window[erased_index].coding.clone().unwrap(); + window[erased_index].data = None; + window[erased_index].coding = None; // Generate the db_ledger from the window let ledger_path = get_tmp_ledger_path("test_try_erasure"); @@ -614,10 +611,10 @@ mod test { let mut consume_queue = vec![]; try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt"); - window[erase_offset].data = erased_data; + window[erased_index].data = erased_data; { - let data_blobs: Vec<_> = window[erase_offset..end_index] + let data_blobs: Vec<_> = window[erased_index..end_index] .iter() .map(|slot| slot.data.clone().unwrap()) .collect(); @@ -633,8 +630,7 @@ mod test { let erased_coding_l = erased_coding.read().unwrap(); assert_eq!( &db_ledger - .erasure_cf - .get_by_slot_index(slot_height, erase_offset as u64) + .get_coding_blob_bytes(slot_height, erased_index as u64) .unwrap() .unwrap()[BLOB_HEADER_SIZE..], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], diff --git a/src/erasure.rs b/src/erasure.rs index 8ce5b22b6f..4527e03f95 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -418,7 +418,7 @@ pub fn recover( // Add the coding blobs we have into the recovery vector, mark the missing ones for i in coding_start_idx..block_end_idx { - let result = db_ledger.erasure_cf.get_by_slot_index(slot, i)?; + let result = db_ledger.get_coding_blob_bytes(slot, i)?; categorize_blob( &result, @@ -508,10 +508,10 @@ pub fn recover( } if corrupt { - // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct - // the blobs again + // Remove the corrupted coding blobs so there's no effort wasted in trying to + // reconstruct the blobs again for i in coding_start_idx..block_end_idx { - db_ledger.erasure_cf.delete_by_slot_index(slot, i)?; + db_ledger.delete_coding_blob(slot, i)?; } return Ok((vec![], vec![])); } @@ -650,8 +650,7 @@ pub mod test { .expect("Expected coding blob to have valid data size"); db_ledger - .erasure_cf - .put_by_slot_index( + .put_coding_blob_bytes( coding_lock.slot().unwrap(), index, &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],