Debug broadcast (#2208)

* Add per cf rocksdb options, increase compaction and flush threads

* Change broadcast stage to bulk write blobs

* add db_ledger function specifically for broadcast

* fix broken tests

* fix benches
This commit is contained in:
carllin 2018-12-19 16:11:47 -08:00 committed by GitHub
parent 2fe3402362
commit 666af1e62d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 310 additions and 108 deletions

View File

@ -154,18 +154,17 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
blobs.shuffle(&mut thread_rng());
let slot = 0;
let mut shared_blobs = entries.to_blobs();
shared_blobs.shuffle(&mut thread_rng());
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let key = DataCf::key(slot, index);
db_ledger.insert_data_blob(&key, blob).unwrap();
blob.set_index(index + num_entries as u64).unwrap();
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
}
});
@ -181,18 +180,17 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
blobs.shuffle(&mut thread_rng());
let slot = 0;
let mut shared_blobs = entries.to_blobs();
shared_blobs.shuffle(&mut thread_rng());
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let key = DataCf::key(slot, index);
db_ledger.insert_data_blob(&key, blob).unwrap();
blob.set_index(index + num_entries as u64).unwrap();
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
}
});

View File

@ -100,7 +100,8 @@ fn broadcast(
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for (b, _) in &blobs {
let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect();
for b in &blobs {
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
if let Some(x) = win[pos].data.take() {
@ -122,7 +123,7 @@ fn broadcast(
trace!("{} null {}", id, pos);
}
for (b, _) in &blobs {
for b in &blobs {
{
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
@ -130,8 +131,11 @@ fn broadcast(
assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone());
}
db_ledger.write_shared_blobs(vec![b])?;
}
db_ledger
.write_consecutive_blobs(&blobs)
.expect("Unrecoverable failure to write to database");
}
// Fill in the coding blob data from the window data blobs

View File

@ -7,15 +7,18 @@ use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result};
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use rocksdb::{ColumnFamily, DBRawIterator, Options, WriteBatch, DB};
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, Options, WriteBatch, DB};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow;
use std::cmp::max;
use std::io;
use std::path::Path;
pub const DB_LEDGER_DIRECTORY: &str = "rocksdb";
// A good value for this is the number of cores on the machine
pub const TOTAL_THREADS: i32 = 8;
#[derive(Debug, PartialEq, Eq)]
pub enum DbLedgerError {
@ -252,15 +255,20 @@ impl DbLedger {
let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY);
// Use default database options
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
let db_options = Self::get_db_options();
// Column family names
let cfs = vec![META_CF, DATA_CF, ERASURE_CF];
let meta_cf_descriptor = ColumnFamilyDescriptor::new(META_CF, Self::get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(DATA_CF, Self::get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(ERASURE_CF, Self::get_cf_options());
let cfs = vec![
meta_cf_descriptor,
data_cf_descriptor,
erasure_cf_descriptor,
];
// Open the database
let db = DB::open_cf(&options, ledger_path, &cfs)?;
let db = DB::open_cf_descriptors(&db_options, ledger_path, cfs)?;
// Create the metadata column family
let meta_cf = MetaCf::default();
@ -290,30 +298,26 @@ impl DbLedger {
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
{
let mut entries = vec![];
for b in shared_blobs {
let bl = b.borrow().read().unwrap();
let index = bl.index()?;
let slot = bl.slot()?;
let key = DataCf::key(slot, index);
let new_entries = self.insert_data_blob(&key, &*bl)?;
entries.extend(new_entries);
}
Ok(entries)
let c_blobs: Vec<_> = shared_blobs
.into_iter()
.map(move |s| s.borrow().clone())
.collect();
let r_blobs: Vec<_> = c_blobs.iter().map(move |b| b.read().unwrap()).collect();
let blobs = r_blobs.iter().map(|s| &**s);
let new_entries = self.insert_data_blobs(blobs)?;
Ok(new_entries)
}
pub fn write_blobs<'a, I>(&self, blobs: I) -> Result<Vec<Entry>>
where
I: IntoIterator<Item = &'a &'a Blob>,
{
let mut entries = vec![];
for blob in blobs.into_iter() {
let index = blob.index()?;
let key = DataCf::key(blob.slot()?, index);
let new_entries = self.insert_data_blob(&key, blob)?;
entries.extend(new_entries);
}
Ok(entries)
let blobs = blobs.into_iter().cloned();
let new_entries = self.insert_data_blobs(blobs)?;
Ok(new_entries)
}
pub fn write_entries<I>(&self, slot: u64, entries: I) -> Result<Vec<Entry>>
@ -334,8 +338,24 @@ impl DbLedger {
self.write_shared_blobs(shared_blobs)
}
pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result<Vec<Entry>> {
let slot_height = DataCf::slot_height_from_key(key)?;
pub fn insert_data_blobs<I>(&self, new_blobs: I) -> Result<Vec<Entry>>
where
I: IntoIterator,
I::Item: Borrow<Blob>,
{
let mut new_blobs: Vec<_> = new_blobs.into_iter().collect();
if new_blobs.is_empty() {
return Ok(vec![]);
}
new_blobs.sort_unstable_by(|b1, b2| {
b1.borrow()
.index()
.unwrap()
.cmp(&b2.borrow().index().unwrap())
});
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let mut should_write_meta = false;
@ -349,63 +369,80 @@ impl DbLedger {
}
};
let mut index = DataCf::index_from_key(key)?;
// TODO: Handle if leader sends different blob for same index when the index > consumed
// The old window implementation would just replace that index.
if index < meta.consumed {
let lowest_index = new_blobs[0].borrow().index()?;
let lowest_slot = new_blobs[0].borrow().slot()?;
let highest_index = new_blobs.last().unwrap().borrow().index()?;
let highest_slot = new_blobs.last().unwrap().borrow().slot()?;
if lowest_index < meta.consumed {
return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists));
}
// Index is zero-indexed, while the "received" height starts from 1,
// so received = index + 1 for the same blob.
if index >= meta.received {
meta.received = index + 1;
meta.received_slot = slot_height;
if highest_index >= meta.received {
meta.received = highest_index + 1;
meta.received_slot = highest_slot;
should_write_meta = true;
}
let mut consumed_queue = vec![];
if meta.consumed == index {
// Add the new blob to the consumed queue
let serialized_entry_data =
&new_blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + new_blob.size()?];
// Verify entries can actually be reconstructed
let entry: Entry = deserialize(serialized_entry_data)
.expect("Blob made it past validation, so must be deserializable at this point");
should_write_meta = true;
meta.consumed += 1;
consumed_queue.push(entry);
if meta.consumed == lowest_index {
// Find the next consecutive block of blobs.
// TODO: account for consecutive blocks that
// span multiple slots
let mut current_slot = slot_height;
loop {
index += 1;
let key = DataCf::key(current_slot, index);
let blob_data = {
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
blob_data
} else if meta.consumed < meta.received {
let key = DataCf::key(current_slot + 1, index);
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
current_slot += 1;
meta.consumed_slot = current_slot;
blob_data
should_write_meta = true;
let mut index_into_blob = 0;
let mut current_index = lowest_index;
let mut current_slot = lowest_slot;
'outer: loop {
let entry: Entry = {
let (next_new_blob, new_blob_index) = {
if index_into_blob < new_blobs.len() {
let blob = new_blobs[index_into_blob].borrow();
(Some(blob), Some(blob.index()?))
} else {
break;
(None, None)
}
};
if new_blob_index == Some(current_index) {
index_into_blob += 1;
let next_new_blob = next_new_blob.unwrap();
current_slot = next_new_blob.slot()?;
let serialized_entry_data = &next_new_blob.data
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_new_blob.size()?];
// Verify entries can actually be reconstructed
deserialize(serialized_entry_data).expect(
"Blob made it past validation, so must be deserializable at this point",
)
} else {
break;
let key = DataCf::key(current_slot, current_index);
let blob_data = {
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
blob_data
} else if meta.consumed < meta.received {
let key = DataCf::key(current_slot + 1, current_index);
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? {
current_slot += 1;
meta.consumed_slot = current_slot;
blob_data
} else {
break 'outer;
}
} else {
break 'outer;
}
};
deserialize(&blob_data[BLOB_HEADER_SIZE..])
.expect("Blobs in database must be deserializable")
}
};
let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..];
let entry: Entry = deserialize(serialized_entry_data)
.expect("Ledger should only contain well formed data");
consumed_queue.push(entry);
current_index += 1;
meta.consumed += 1;
}
}
@ -416,12 +453,53 @@ impl DbLedger {
batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?;
}
let serialized_blob_data = &new_blob.data[..BLOB_HEADER_SIZE + new_blob.size()?];
batch.put_cf(self.data_cf.handle(&self.db), key, serialized_blob_data)?;
for blob in new_blobs {
let blob = blob.borrow();
let key = DataCf::key(blob.slot()?, blob.index()?);
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?;
}
self.db.write(batch)?;
Ok(consumed_queue)
}
// Writes a list of sorted, consecutive broadcast blobs to the db_ledger
pub fn write_consecutive_blobs(&self, blobs: &[SharedBlob]) -> Result<()> {
assert!(!blobs.is_empty());
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let mut meta = {
if let Some(meta) = self.meta_cf.get(&self.db, &meta_key)? {
let first = blobs[0].read().unwrap();
assert_eq!(meta.consumed, first.index()?);
meta
} else {
SlotMeta::new()
}
};
{
let last = blobs.last().unwrap().read().unwrap();
meta.consumed = last.index()? + 1;
meta.consumed_slot = last.slot()?;
meta.received = max(meta.received, last.index()? + 1);
meta.received_slot = max(meta.received_slot, last.index()?);
}
let mut batch = WriteBatch::default();
batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?;
for blob in blobs {
let blob = blob.read().unwrap();
let key = DataCf::key(blob.slot()?, blob.index()?);
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?;
}
self.db.write(batch)?;
Ok(())
}
// Fill 'buf' with num_blobs or most number of consecutive
// whole blobs that fit into buf.len()
//
@ -492,6 +570,25 @@ impl DbLedger {
db_iterator.seek_to_first();
Ok(EntryIterator { db_iterator })
}
fn get_cf_options() -> Options {
let mut options = Options::default();
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(512 * 1024 * 1024);
options
}
fn get_db_options() -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.increase_parallelism(TOTAL_THREADS);
options.set_max_background_flushes(4);
options.set_max_background_compactions(4);
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(512 * 1024 * 1024);
options
}
}
struct EntryIterator {
@ -690,6 +787,11 @@ mod tests {
fn test_insert_data_blobs_basic() {
let entries = make_tiny_test_entries(2);
let shared_blobs = entries.to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -697,9 +799,7 @@ mod tests {
let ledger = DbLedger::open(&ledger_path).unwrap();
// Insert second blob, we're missing the first blob, so should return nothing
let result = ledger
.insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, 1), blobs[1])
.unwrap();
let result = ledger.insert_data_blobs(vec![blobs[1]]).unwrap();
assert!(result.len() == 0);
let meta = ledger
@ -710,9 +810,7 @@ mod tests {
assert!(meta.consumed == 0 && meta.received == 2);
// Insert first blob, check for consecutive returned entries
let result = ledger
.insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, 0), blobs[0])
.unwrap();
let result = ledger.insert_data_blobs(vec![blobs[0]]).unwrap();
assert_eq!(result, entries);
@ -733,6 +831,9 @@ mod tests {
let num_blobs = 10;
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -741,9 +842,7 @@ mod tests {
// Insert blobs in reverse, check for consecutive returned blobs
for i in (0..num_blobs).rev() {
let result = ledger
.insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i])
.unwrap();
let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap();
let meta = ledger
.meta_cf
@ -769,6 +868,9 @@ mod tests {
let num_blobs = 10;
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -777,18 +879,13 @@ mod tests {
// Insert last blob into next slot
let result = ledger
.insert_data_blob(
&DataCf::key(DEFAULT_SLOT_HEIGHT + 1, (num_blobs - 1) as u64),
blobs.last().unwrap(),
)
.insert_data_blobs(vec![*blobs.last().unwrap()])
.unwrap();
assert_eq!(result.len(), 0);
// Insert blobs into first slot, check for consecutive blobs
for i in (0..num_blobs - 1).rev() {
let result = ledger
.insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i])
.unwrap();
let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap();
let meta = ledger
.meta_cf
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
@ -826,9 +923,12 @@ mod tests {
w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
}
db_ledger
.write_shared_blobs(&shared_blobs)
.expect("Expected successful write of blobs");
assert_eq!(
db_ledger
.write_shared_blobs(&shared_blobs)
.expect("Expected successful write of blobs"),
vec![]
);
let mut db_iterator = db_ledger
.db
.raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db))
@ -849,6 +949,106 @@ mod tests {
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_blobs_bulk() {
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk");
{
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries
let num_entries = 20 as u64;
let original_entries = make_tiny_test_entries(num_entries as usize);
let shared_blobs = original_entries.clone().to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64).unwrap();
w_b.set_slot(i as u64).unwrap();
}
assert_eq!(
db_ledger
.write_shared_blobs(shared_blobs.iter().skip(1).step_by(2))
.unwrap(),
vec![]
);
assert_eq!(
db_ledger
.write_shared_blobs(shared_blobs.iter().step_by(2))
.unwrap(),
original_entries
);
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let meta = db_ledger
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed_slot, num_entries - 1);
assert_eq!(meta.received_slot, num_entries - 1);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_write_consecutive_blobs() {
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs");
{
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries
let num_entries = 20 as u64;
let original_entries = make_tiny_test_entries(num_entries as usize);
let shared_blobs = original_entries.to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64).unwrap();
w_b.set_slot(i as u64).unwrap();
}
db_ledger
.write_consecutive_blobs(&shared_blobs)
.expect("Expect successful blob writes");
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let meta = db_ledger
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed_slot, num_entries - 1);
assert_eq!(meta.received_slot, num_entries - 1);
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(num_entries + i as u64).unwrap();
w_b.set_slot(num_entries + i as u64).unwrap();
}
db_ledger
.write_consecutive_blobs(&shared_blobs)
.expect("Expect successful blob writes");
let meta = db_ledger
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, 2 * num_entries);
assert_eq!(meta.received, 2 * num_entries);
assert_eq!(meta.consumed_slot, 2 * num_entries - 1);
assert_eq!(meta.received_slot, 2 * num_entries - 1);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_genesis_and_entry_iterator() {
// Create RocksDb ledger

View File

@ -310,8 +310,7 @@ pub fn process_blob(
)?;
vec![]
} else {
let data_key = DataCf::key(slot, pix);
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])?
};
#[cfg(feature = "erasure")]

View File

@ -450,6 +450,7 @@ mod tests {
let mut bank = Bank::new(&alice);
let bob_pubkey = Keypair::new().pubkey();
let ledger_path = create_tmp_ledger_with_mint("thin_client", &alice);
let entry_height = alice.create_entries().len() as u64;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id,
@ -461,7 +462,7 @@ mod tests {
leader_keypair,
vote_account_keypair,
bank,
0,
entry_height,
&last_id,
leader,
None,