Cleanup slot remnants in db_ledger (#2153)

* Cleanup slot remnants in db_ledger
This commit is contained in:
carllin 2018-12-14 17:05:41 -08:00 committed by GitHub
parent fe5566d642
commit 9ef5e51c0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 41 deletions

View File

@ -45,10 +45,13 @@ fn setup_read_bench(
let mut entries = make_large_test_entries(num_large_blobs as usize); let mut entries = make_large_test_entries(num_large_blobs as usize);
entries.extend(make_tiny_test_entries(num_small_blobs as usize)); entries.extend(make_tiny_test_entries(num_small_blobs as usize));
// Convert the entries to blobs, wrsite the blobs to the ledger // Convert the entries to blobs, write the blobs to the ledger
let shared_blobs = entries.to_blobs(); let shared_blobs = entries.to_blobs();
for b in shared_blobs.iter() {
b.write().unwrap().set_slot(slot).unwrap();
}
db_ledger db_ledger
.write_shared_blobs(slot, &shared_blobs) .write_shared_blobs(&shared_blobs)
.expect("Expectd successful insertion of blobs into ledger"); .expect("Expectd successful insertion of blobs into ledger");
} }

View File

@ -122,7 +122,7 @@ fn broadcast(
trace!("{} null {}", id, pos); trace!("{} null {}", id, pos);
} }
for (b, slot) in &blobs { for (b, _) in &blobs {
{ {
let ix = b.read().unwrap().index().expect("blob index"); let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize; let pos = (ix % window_size) as usize;
@ -130,10 +130,7 @@ fn broadcast(
assert!(win[pos].data.is_none()); assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone()); win[pos].data = Some(b.clone());
} }
db_ledger db_ledger.write().unwrap().write_shared_blobs(vec![b])?;
.write()
.unwrap()
.write_shared_blobs(*slot, vec![b])?;
} }
} }

View File

@ -1236,7 +1236,7 @@ mod tests {
{ {
let mut w_ledger = db_ledger.write().unwrap(); let mut w_ledger = db_ledger.write().unwrap();
w_ledger w_ledger
.write_shared_blobs(2, vec![&blob]) .write_shared_blobs(vec![&blob])
.expect("Expect successful ledger write"); .expect("Expect successful ledger write");
} }

View File

@ -285,7 +285,7 @@ impl DbLedger {
Ok(()) Ok(())
} }
pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<Vec<Entry>> pub fn write_shared_blobs<I>(&mut self, shared_blobs: I) -> Result<Vec<Entry>>
where where
I: IntoIterator, I: IntoIterator,
I::Item: Borrow<SharedBlob>, I::Item: Borrow<SharedBlob>,
@ -294,6 +294,7 @@ impl DbLedger {
for b in shared_blobs { for b in shared_blobs {
let bl = b.borrow().read().unwrap(); let bl = b.borrow().read().unwrap();
let index = bl.index()?; let index = bl.index()?;
let slot = bl.slot()?;
let key = DataCf::key(slot, index); let key = DataCf::key(slot, index);
let new_entries = self.insert_data_blob(&key, &*bl)?; let new_entries = self.insert_data_blob(&key, &*bl)?;
entries.extend(new_entries); entries.extend(new_entries);
@ -301,14 +302,14 @@ impl DbLedger {
Ok(entries) Ok(entries)
} }
pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<Vec<Entry>> pub fn write_blobs<'a, I>(&mut self, blobs: I) -> Result<Vec<Entry>>
where where
I: IntoIterator<Item = &'a &'a Blob>, I: IntoIterator<Item = &'a &'a Blob>,
{ {
let mut entries = vec![]; let mut entries = vec![];
for blob in blobs.into_iter() { for blob in blobs.into_iter() {
let index = blob.index()?; let index = blob.index()?;
let key = DataCf::key(slot, index); let key = DataCf::key(blob.slot()?, index);
let new_entries = self.insert_data_blob(&key, blob)?; let new_entries = self.insert_data_blob(&key, blob)?;
entries.extend(new_entries); entries.extend(new_entries);
} }
@ -322,11 +323,15 @@ impl DbLedger {
{ {
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
let b = entry.borrow().to_blob(); let b = entry.borrow().to_blob();
b.write().unwrap().set_index(idx as u64).unwrap(); {
let mut w_b = b.write().unwrap();
w_b.set_index(idx as u64).unwrap();
w_b.set_slot(slot).unwrap();
}
b b
}); });
self.write_shared_blobs(slot, shared_blobs) self.write_shared_blobs(shared_blobs)
} }
pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result<Vec<Entry>> { pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result<Vec<Entry>> {
@ -547,10 +552,11 @@ where
let b = entry.borrow().to_blob(); let b = entry.borrow().to_blob();
b.write().unwrap().set_index(idx as u64).unwrap(); b.write().unwrap().set_index(idx as u64).unwrap();
b.write().unwrap().set_id(&keypair.pubkey()).unwrap(); b.write().unwrap().set_id(&keypair.pubkey()).unwrap();
b.write().unwrap().set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
b b
}); });
db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?; db_ledger.write_shared_blobs(blobs)?;
Ok(()) Ok(())
} }
@ -614,19 +620,19 @@ mod tests {
#[test] #[test]
fn test_get_blobs_bytes() { fn test_get_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_blobs(); let shared_blobs = make_tiny_test_entries(10).to_blobs();
let slot = DEFAULT_SLOT_HEIGHT;
index_blobs( index_blobs(
shared_blobs.iter().zip(vec![0u64; 10].into_iter()), shared_blobs.iter().zip(vec![slot; 10].into_iter()),
&Keypair::new().pubkey(), &Keypair::new().pubkey(),
0, 0,
); );
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
let slot = DEFAULT_SLOT_HEIGHT;
let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes"); let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes");
let mut ledger = DbLedger::open(&ledger_path).unwrap(); let mut ledger = DbLedger::open(&ledger_path).unwrap();
ledger.write_blobs(slot, &blobs).unwrap(); ledger.write_blobs(&blobs).unwrap();
let mut buf = [0; 1024]; let mut buf = [0; 1024];
let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf, slot).unwrap(); let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf, slot).unwrap();
@ -815,11 +821,13 @@ mod tests {
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (i, b) in shared_blobs.iter().enumerate() { for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(1 << (i * 8)).unwrap(); let mut w_b = b.write().unwrap();
w_b.set_index(1 << (i * 8)).unwrap();
w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
} }
db_ledger db_ledger
.write_shared_blobs(DEFAULT_SLOT_HEIGHT, &shared_blobs) .write_shared_blobs(&shared_blobs)
.expect("Expected successful write of blobs"); .expect("Expected successful write of blobs");
let mut db_iterator = db_ledger let mut db_iterator = db_ledger
.db .db

View File

@ -404,10 +404,7 @@ fn try_erasure(db_ledger: &Arc<RwLock<DbLedger>>, consume_queue: &mut Vec<Entry>
.put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; .put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?;
} }
let entries = db_ledger let entries = db_ledger.write().unwrap().write_shared_blobs(data)?;
.write()
.unwrap()
.write_shared_blobs(meta.consumed_slot, data)?;
consume_queue.extend(entries); consume_queue.extend(entries);
} }
@ -546,7 +543,7 @@ mod test {
#[test] #[test]
pub fn test_find_missing_data_indexes_sanity() { pub fn test_find_missing_data_indexes_sanity() {
let slot = 0; let slot = DEFAULT_SLOT_HEIGHT;
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_sanity"); let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_sanity");
@ -564,11 +561,12 @@ mod test {
{ {
let mut bl = shared_blob.write().unwrap(); let mut bl = shared_blob.write().unwrap();
bl.set_index(10).unwrap(); bl.set_index(10).unwrap();
bl.set_slot(slot).unwrap();
} }
// Insert one blob at index = first_index // Insert one blob at index = first_index
db_ledger db_ledger
.write_blobs(slot, &vec![&*shared_blob.read().unwrap()]) .write_blobs(&vec![&*shared_blob.read().unwrap()])
.unwrap(); .unwrap();
// The first blob has index = first_index. Thus, for i < first_index, // The first blob has index = first_index. Thus, for i < first_index,
@ -594,7 +592,7 @@ mod test {
#[test] #[test]
pub fn test_find_missing_data_indexes() { pub fn test_find_missing_data_indexes() {
let slot = 0; let slot = DEFAULT_SLOT_HEIGHT;
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes");
let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap();
@ -605,11 +603,13 @@ mod test {
let num_entries = 10; let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (i, b) in shared_blobs.iter().enumerate() { for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64 * gap).unwrap(); let mut w_b = b.write().unwrap();
w_b.set_index(i as u64 * gap).unwrap();
w_b.set_slot(slot).unwrap();
} }
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
db_ledger.write_blobs(slot, &blobs).unwrap(); db_ledger.write_blobs(&blobs).unwrap();
// Index of the first blob is 0 // Index of the first blob is 0
// Index of the second blob is "gap" // Index of the second blob is "gap"
@ -682,7 +682,7 @@ mod test {
#[test] #[test]
pub fn test_no_missing_blob_indexes() { pub fn test_no_missing_blob_indexes() {
let slot = 0; let slot = DEFAULT_SLOT_HEIGHT;
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes");
let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap();
@ -692,14 +692,14 @@ mod test {
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
index_blobs( index_blobs(
shared_blobs.iter().zip(vec![0u64; num_entries].into_iter()), shared_blobs.iter().zip(vec![slot; num_entries].into_iter()),
&Keypair::new().pubkey(), &Keypair::new().pubkey(),
0, 0,
); );
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
db_ledger.write_blobs(slot, &blobs).unwrap(); db_ledger.write_blobs(&blobs).unwrap();
let empty: Vec<u64> = vec![]; let empty: Vec<u64> = vec![];
for i in 0..num_entries as u64 { for i in 0..num_entries as u64 {
@ -742,7 +742,6 @@ mod test {
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window(
&ledger_path, &ledger_path,
&window, &window,
slot_height,
false, false,
))); )));

View File

@ -634,7 +634,6 @@ pub mod test {
pub fn generate_db_ledger_from_window( pub fn generate_db_ledger_from_window(
ledger_path: &str, ledger_path: &str,
window: &[WindowSlot], window: &[WindowSlot],
slot_height: u64,
use_random: bool, use_random: bool,
) -> DbLedger { ) -> DbLedger {
let mut db_ledger = let mut db_ledger =
@ -649,14 +648,14 @@ pub mod test {
.data_cf .data_cf
.put_by_slot_index( .put_by_slot_index(
&db_ledger.db, &db_ledger.db,
slot_height, data_l.slot().unwrap(),
data_l.index().unwrap(), data_l.index().unwrap(),
&data_l.data[..data_l.data_size().unwrap() as usize], &data_l.data[..data_l.data_size().unwrap() as usize],
) )
.expect("Expected successful put into data column of ledger"); .expect("Expected successful put into data column of ledger");
} else { } else {
db_ledger db_ledger
.write_shared_blobs(slot_height, vec![data].into_iter()) .write_shared_blobs(vec![data].into_iter())
.unwrap(); .unwrap();
} }
} }
@ -670,13 +669,13 @@ pub mod test {
let data_size = coding_lock let data_size = coding_lock
.size() .size()
.expect("Expected coding blob to have valid ata size"); .expect("Expected coding blob to have valid data size");
db_ledger db_ledger
.erasure_cf .erasure_cf
.put_by_slot_index( .put_by_slot_index(
&db_ledger.db, &db_ledger.db,
slot_height, coding_lock.slot().unwrap(),
index, index,
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
) )
@ -792,8 +791,10 @@ pub mod test {
{ {
// Make some dummy slots // Make some dummy slots
let slot_tick_heights: Vec<(&SharedBlob, u64)> = let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs
blobs.iter().zip(vec![0; blobs.len()]).collect(); .iter()
.zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()])
.collect();
index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
} }
@ -844,7 +845,6 @@ pub mod test {
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window(
&ledger_path, &ledger_path,
&window, &window,
DEFAULT_SLOT_HEIGHT,
true, true,
))); )));
@ -899,7 +899,6 @@ pub mod test {
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window(
&ledger_path, &ledger_path,
&window, &window,
DEFAULT_SLOT_HEIGHT,
true, true,
))); )));