From 9ef5e51c0fa23b8be4224936b51aaa42ce960861 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 14 Dec 2018 17:05:41 -0800 Subject: [PATCH] Cleanup slot remnants in db_ledger (#2153) * Cleanup slot remnants in db_ledger --- benches/db_ledger.rs | 7 +++++-- src/broadcast_service.rs | 7 ++----- src/cluster_info.rs | 2 +- src/db_ledger.rs | 30 +++++++++++++++++++----------- src/db_window.rs | 25 ++++++++++++------------- src/erasure.rs | 17 ++++++++--------- 6 files changed, 47 insertions(+), 41 deletions(-) diff --git a/benches/db_ledger.rs b/benches/db_ledger.rs index 5f2f8a6b7..f265277d7 100644 --- a/benches/db_ledger.rs +++ b/benches/db_ledger.rs @@ -45,10 +45,13 @@ fn setup_read_bench( let mut entries = make_large_test_entries(num_large_blobs as usize); entries.extend(make_tiny_test_entries(num_small_blobs as usize)); - // Convert the entries to blobs, wrsite the blobs to the ledger + // Convert the entries to blobs, write the blobs to the ledger let shared_blobs = entries.to_blobs(); + for b in shared_blobs.iter() { + b.write().unwrap().set_slot(slot).unwrap(); + } db_ledger - .write_shared_blobs(slot, &shared_blobs) + .write_shared_blobs(&shared_blobs) .expect("Expectd successful insertion of blobs into ledger"); } diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index a38d64614..fb7d81a10 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -122,7 +122,7 @@ fn broadcast( trace!("{} null {}", id, pos); } - for (b, slot) in &blobs { + for (b, _) in &blobs { { let ix = b.read().unwrap().index().expect("blob index"); let pos = (ix % window_size) as usize; @@ -130,10 +130,7 @@ fn broadcast( assert!(win[pos].data.is_none()); win[pos].data = Some(b.clone()); } - db_ledger - .write() - .unwrap() - .write_shared_blobs(*slot, vec![b])?; + db_ledger.write().unwrap().write_shared_blobs(vec![b])?; } } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index a31ff2469..3e33252b6 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -1236,7 +1236,7 @@ mod tests { { let mut w_ledger = db_ledger.write().unwrap(); w_ledger - .write_shared_blobs(2, vec![&blob]) + .write_shared_blobs(vec![&blob]) .expect("Expect successful ledger write"); } diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 0a841ba18..ec36ce5f6 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -285,7 +285,7 @@ impl DbLedger { Ok(()) } - pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: I) -> Result> + pub fn write_shared_blobs(&mut self, shared_blobs: I) -> Result> where I: IntoIterator, I::Item: Borrow, @@ -294,6 +294,7 @@ impl DbLedger { for b in shared_blobs { let bl = b.borrow().read().unwrap(); let index = bl.index()?; + let slot = bl.slot()?; let key = DataCf::key(slot, index); let new_entries = self.insert_data_blob(&key, &*bl)?; entries.extend(new_entries); @@ -301,14 +302,14 @@ impl DbLedger { Ok(entries) } - pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result> + pub fn write_blobs<'a, I>(&mut self, blobs: I) -> Result> where I: IntoIterator, { let mut entries = vec![]; for blob in blobs.into_iter() { let index = blob.index()?; - let key = DataCf::key(slot, index); + let key = DataCf::key(blob.slot()?, index); let new_entries = self.insert_data_blob(&key, blob)?; entries.extend(new_entries); } @@ -322,11 +323,15 @@ impl DbLedger { { let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { let b = entry.borrow().to_blob(); - b.write().unwrap().set_index(idx as u64).unwrap(); + { + let mut w_b = b.write().unwrap(); + w_b.set_index(idx as u64).unwrap(); + w_b.set_slot(slot).unwrap(); + } b }); - self.write_shared_blobs(slot, shared_blobs) + self.write_shared_blobs(shared_blobs) } pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result> { @@ -547,10 +552,11 @@ where let b = entry.borrow().to_blob(); b.write().unwrap().set_index(idx as u64).unwrap(); b.write().unwrap().set_id(&keypair.pubkey()).unwrap(); + b.write().unwrap().set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); b }); - db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?; + db_ledger.write_shared_blobs(blobs)?; Ok(()) } @@ -614,19 +620,19 @@ mod tests { #[test] fn test_get_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_blobs(); + let slot = DEFAULT_SLOT_HEIGHT; index_blobs( - shared_blobs.iter().zip(vec![0u64; 10].into_iter()), + shared_blobs.iter().zip(vec![slot; 10].into_iter()), &Keypair::new().pubkey(), 0, ); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - let slot = DEFAULT_SLOT_HEIGHT; let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes"); let mut ledger = DbLedger::open(&ledger_path).unwrap(); - ledger.write_blobs(slot, &blobs).unwrap(); + ledger.write_blobs(&blobs).unwrap(); let mut buf = [0; 1024]; let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf, slot).unwrap(); @@ -815,11 +821,13 @@ mod tests { let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); for (i, b) in shared_blobs.iter().enumerate() { - b.write().unwrap().set_index(1 << (i * 8)).unwrap(); + let mut w_b = b.write().unwrap(); + w_b.set_index(1 << (i * 8)).unwrap(); + w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); } db_ledger - .write_shared_blobs(DEFAULT_SLOT_HEIGHT, &shared_blobs) + .write_shared_blobs(&shared_blobs) .expect("Expected successful write of blobs"); let mut db_iterator = db_ledger .db diff --git a/src/db_window.rs b/src/db_window.rs index fa1bbd36a..50b156308 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -404,10 +404,7 @@ fn try_erasure(db_ledger: &Arc>, consume_queue: &mut Vec .put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; } - let entries = db_ledger - .write() - .unwrap() - .write_shared_blobs(meta.consumed_slot, data)?; + let entries = db_ledger.write().unwrap().write_shared_blobs(data)?; consume_queue.extend(entries); } @@ -546,7 +543,7 @@ mod test { #[test] pub fn test_find_missing_data_indexes_sanity() { - let slot = 0; + let slot = DEFAULT_SLOT_HEIGHT; // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_sanity"); @@ -564,11 +561,12 @@ mod test { { let mut bl = shared_blob.write().unwrap(); bl.set_index(10).unwrap(); + bl.set_slot(slot).unwrap(); } // Insert one blob at index = first_index db_ledger - .write_blobs(slot, &vec![&*shared_blob.read().unwrap()]) + .write_blobs(&vec![&*shared_blob.read().unwrap()]) .unwrap(); // The first blob has index = first_index. Thus, for i < first_index, @@ -594,7 +592,7 @@ mod test { #[test] pub fn test_find_missing_data_indexes() { - let slot = 0; + let slot = DEFAULT_SLOT_HEIGHT; // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); @@ -605,11 +603,13 @@ mod test { let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); for (i, b) in shared_blobs.iter().enumerate() { - b.write().unwrap().set_index(i as u64 * gap).unwrap(); + let mut w_b = b.write().unwrap(); + w_b.set_index(i as u64 * gap).unwrap(); + w_b.set_slot(slot).unwrap(); } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - db_ledger.write_blobs(slot, &blobs).unwrap(); + db_ledger.write_blobs(&blobs).unwrap(); // Index of the first blob is 0 // Index of the second blob is "gap" @@ -682,7 +682,7 @@ mod test { #[test] pub fn test_no_missing_blob_indexes() { - let slot = 0; + let slot = DEFAULT_SLOT_HEIGHT; // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); @@ -692,14 +692,14 @@ mod test { let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); index_blobs( - shared_blobs.iter().zip(vec![0u64; num_entries].into_iter()), + shared_blobs.iter().zip(vec![slot; num_entries].into_iter()), &Keypair::new().pubkey(), 0, ); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - db_ledger.write_blobs(slot, &blobs).unwrap(); + db_ledger.write_blobs(&blobs).unwrap(); let empty: Vec = vec![]; for i in 0..num_entries as u64 { @@ -742,7 +742,6 @@ mod test { let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( &ledger_path, &window, - slot_height, false, ))); diff --git a/src/erasure.rs b/src/erasure.rs index 2a788ca83..862389542 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -634,7 +634,6 @@ pub mod test { pub fn generate_db_ledger_from_window( ledger_path: &str, window: &[WindowSlot], - slot_height: u64, use_random: bool, ) -> DbLedger { let mut db_ledger = @@ -649,14 +648,14 @@ pub mod test { .data_cf .put_by_slot_index( &db_ledger.db, - slot_height, + data_l.slot().unwrap(), data_l.index().unwrap(), &data_l.data[..data_l.data_size().unwrap() as usize], ) .expect("Expected successful put into data column of ledger"); } else { db_ledger - .write_shared_blobs(slot_height, vec![data].into_iter()) + .write_shared_blobs(vec![data].into_iter()) .unwrap(); } } @@ -670,13 +669,13 @@ pub mod test { let data_size = coding_lock .size() - .expect("Expected coding blob to have valid ata size"); + .expect("Expected coding blob to have valid data size"); db_ledger .erasure_cf .put_by_slot_index( &db_ledger.db, - slot_height, + coding_lock.slot().unwrap(), index, &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], ) @@ -792,8 +791,10 @@ pub mod test { { // Make some dummy slots - let slot_tick_heights: Vec<(&SharedBlob, u64)> = - blobs.iter().zip(vec![0; blobs.len()]).collect(); + let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs + .iter() + .zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()]) + .collect(); index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); } @@ -844,7 +845,6 @@ pub mod test { let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( &ledger_path, &window, - DEFAULT_SLOT_HEIGHT, true, ))); @@ -899,7 +899,6 @@ pub mod test { let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( &ledger_path, &window, - DEFAULT_SLOT_HEIGHT, true, )));