Debug broadcast (#2233)

* Account for duplicate blobs in process_blobs

* Increase max bytes for level base to match write buffer
This commit is contained in:
carllin 2018-12-20 12:12:04 -08:00 committed by GitHub
parent 93fb61dc8f
commit 7148c14178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 93 additions and 17 deletions

View File

@ -21,7 +21,8 @@ pub use rocksdb::DBRawIterator;
pub const DB_LEDGER_DIRECTORY: &str = "rocksdb";
// A good value for this is the number of cores on the machine
pub const TOTAL_THREADS: i32 = 8;
const TOTAL_THREADS: i32 = 8;
const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024;
#[derive(Debug)]
pub enum DbLedgerError {
@ -409,21 +410,33 @@ impl DbLedger {
let mut current_slot = lowest_slot;
'outer: loop {
let entry: Entry = {
let (next_new_blob, new_blob_index) = {
if index_into_blob < new_blobs.len() {
let blob = new_blobs[index_into_blob].borrow();
(Some(blob), Some(blob.index()?))
} else {
(None, None)
}
};
// Try to find the next blob we're looking for in the new_blobs
// vector
let mut found_blob = None;
while index_into_blob < new_blobs.len() {
let new_blob = new_blobs[index_into_blob].borrow();
let index = new_blob.index()?;
// Skip over duplicate blobs with the same index and continue
// until we either find the index we're looking for, or detect
// that the index doesn't exist in the new_blobs vector.
if index > current_index {
break;
}
if new_blob_index == Some(current_index) {
index_into_blob += 1;
let next_new_blob = next_new_blob.unwrap();
current_slot = next_new_blob.slot()?;
let serialized_entry_data = &next_new_blob.data
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_new_blob.size()?];
if index == current_index {
found_blob = Some(new_blob);
}
}
// If we found the blob in the new_blobs vector, process it, otherwise,
// look for the blob in the database.
if let Some(next_blob) = found_blob {
current_slot = next_blob.slot()?;
let serialized_entry_data = &next_blob.data
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()?];
// Verify entries can actually be reconstructed
deserialize(serialized_entry_data).expect(
"Blob made it past validation, so must be deserializable at this point",
@ -437,7 +450,6 @@ impl DbLedger {
let key = DataCf::key(current_slot + 1, current_index);
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
current_slot += 1;
meta.consumed_slot = current_slot;
blob_data
} else {
break 'outer;
@ -454,6 +466,7 @@ impl DbLedger {
consumed_queue.push(entry);
current_index += 1;
meta.consumed += 1;
meta.consumed_slot = current_slot;
}
}
@ -584,7 +597,8 @@ impl DbLedger {
fn get_cf_options() -> Options {
let mut options = Options::default();
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(512 * 1024 * 1024);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64);
options
}
@ -596,7 +610,8 @@ impl DbLedger {
options.set_max_background_flushes(4);
options.set_max_background_compactions(4);
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(512 * 1024 * 1024);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64);
options
}
}
@ -1004,6 +1019,67 @@ mod tests {
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_blobs_duplicate() {
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_duplicate");
{
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries
let num_entries = 10 as u64;
let num_duplicates = 2;
let original_entries: Vec<Entry> = make_tiny_test_entries(num_entries as usize)
.into_iter()
.flat_map(|e| vec![e; num_duplicates])
.collect();
let shared_blobs = original_entries.clone().to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let index = (i / 2) as u64;
let mut w_b = b.write().unwrap();
w_b.set_index(index).unwrap();
w_b.set_slot(index).unwrap();
}
assert_eq!(
db_ledger
.write_shared_blobs(
shared_blobs
.iter()
.skip(num_duplicates)
.step_by(num_duplicates * 2)
)
.unwrap(),
vec![]
);
let expected: Vec<_> = original_entries
.into_iter()
.step_by(num_duplicates)
.collect();
assert_eq!(
db_ledger
.write_shared_blobs(shared_blobs.iter().step_by(num_duplicates * 2))
.unwrap(),
expected,
);
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let meta = db_ledger
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed_slot, num_entries - 1);
assert_eq!(meta.received_slot, num_entries - 1);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_write_consecutive_blobs() {
// Create RocksDb ledger