From cdf1a96e23920f91ed48c4837e1bd50e221ccf39 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 9 Nov 2018 20:12:14 -0700 Subject: [PATCH] Revert "V1 Window/Ledger based on RocksDb (#1712)" This reverts commit bfcdec95cb6ae7f21f427d1b660e794bff806153. --- Cargo.toml | 1 - ci/buildkite.yml | 4 +- ci/docker-rust/Dockerfile | 1 - ci/version-check.sh | 4 +- src/db_ledger.rs | 534 -------------------------------------- src/lib.rs | 2 - src/result.rs | 7 - 7 files changed, 4 insertions(+), 549 deletions(-) delete mode 100644 src/db_ledger.rs diff --git a/Cargo.toml b/Cargo.toml index d450a32b1..98be14250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,6 @@ rand = "0.5.1" rayon = "1.0.0" reqwest = "0.9.0" ring = "0.13.2" -rocksdb = "0.10.1" sha2 = "0.8.0" serde = "1.0.27" serde_cbor = "0.9.0" diff --git a/ci/buildkite.yml b/ci/buildkite.yml index 9bb128d27..fc5bbcb1a 100644 --- a/ci/buildkite.yml +++ b/ci/buildkite.yml @@ -4,7 +4,7 @@ steps: env: CARGO_TARGET_CACHE_NAME: "stable" timeout_in_minutes: 30 - - command: "ci/docker-run.sh solanalabs/rust-nightly:2018-11-07 ci/test-bench.sh" + - command: "ci/docker-run.sh solanalabs/rust-nightly:2018-10-04 ci/test-bench.sh" name: "bench [public]" env: CARGO_TARGET_CACHE_NAME: "nightly" @@ -12,7 +12,7 @@ steps: - command: "ci/shellcheck.sh" name: "shellcheck [public]" timeout_in_minutes: 20 - - command: "ci/docker-run.sh solanalabs/rust-nightly:2018-11-07 ci/test-nightly.sh" + - command: "ci/docker-run.sh solanalabs/rust-nightly:2018-10-04 ci/test-nightly.sh" name: "nightly [public]" env: CARGO_TARGET_CACHE_NAME: "nightly" diff --git a/ci/docker-rust/Dockerfile b/ci/docker-rust/Dockerfile index c96f76f8b..b3f6d0931 100644 --- a/ci/docker-rust/Dockerfile +++ b/ci/docker-rust/Dockerfile @@ -12,7 +12,6 @@ RUN set -x && \ apt update && \ apt install -y \ buildkite-agent \ - clang-7 \ cmake \ lcov \ libclang-common-7-dev \ diff --git a/ci/version-check.sh b/ci/version-check.sh index 97eb6ffa2..daad013d5 100755 --- a/ci/version-check.sh +++ b/ci/version-check.sh @@ -19,8 +19,8 @@ require() { case ${1:-stable} in nightly) - require rustc 1.32.[0-9]+-nightly - require cargo 1.32.[0-9]+-nightly + require rustc 1.31.[0-9]+-nightly + require cargo 1.31.[0-9]+-nightly ;; stable) require rustc 1.30.[0-9]+ diff --git a/src/db_ledger.rs b/src/db_ledger.rs deleted file mode 100644 index da6065d5a..000000000 --- a/src/db_ledger.rs +++ /dev/null @@ -1,534 +0,0 @@ -//! The `ledger` module provides functions for parallel verification of the -//! Proof of History ledger as well as iterative read, append write, and random -//! access read to a persistent file-based ledger. - -use bincode::{deserialize, serialize}; -use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; -use packet::Blob; -use result::{Error, Result}; -use rocksdb::{ColumnFamily, Options, WriteBatch, DB}; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::io; - -pub trait LedgerColumnFamily { - type ValueType: DeserializeOwned + Serialize; - - fn get(&self, db: &DB, key: &[u8]) -> Result> { - let data_bytes = db.get_cf(self.handle(), key)?; - - if let Some(raw) = data_bytes { - let result: Self::ValueType = deserialize(&raw)?; - Ok(Some(result)) - } else { - Ok(None) - } - } - - fn get_bytes(&self, db: &DB, key: &[u8]) -> Result>> { - let data_bytes = db.get_cf(self.handle(), key)?; - Ok(data_bytes.map(|x| x.to_vec())) - } - - fn put_bytes(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> { - db.put_cf(self.handle(), &key, &serialized_value)?; - Ok(()) - } - - fn put(&self, db: &DB, key: &[u8], value: &Self::ValueType) -> Result<()> { - let serialized = serialize(value)?; - db.put_cf(self.handle(), &key, &serialized)?; - Ok(()) - } - - fn handle(&self) -> ColumnFamily; -} - -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] -// The Meta column family -pub struct SlotMeta { - pub consumed: u64, - pub received: u64, -} - -impl SlotMeta { - fn new() -> Self { - SlotMeta { - consumed: 0, - received: 0, - } - } -} - -pub struct MetaCf { - handle: ColumnFamily, -} - -impl MetaCf { - pub fn new(handle: ColumnFamily) -> Self { - MetaCf { handle } - } - - pub fn key(slot_height: u64) -> Vec { - let mut key = vec![0u8; 8]; - LittleEndian::write_u64(&mut key[0..8], slot_height); - key - } -} - -impl LedgerColumnFamily for MetaCf { - type ValueType = SlotMeta; - - fn handle(&self) -> ColumnFamily { - self.handle - } -} - -// The data column family -pub struct DataCf { - handle: ColumnFamily, -} - -impl DataCf { - pub fn new(handle: ColumnFamily) -> Self { - DataCf { handle } - } - - pub fn key(slot_height: u64, index: u64) -> Vec { - let mut key = vec![0u8; 16]; - LittleEndian::write_u64(&mut key[0..8], slot_height); - LittleEndian::write_u64(&mut key[8..16], index); - key - } - - pub fn slot_height_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[0..8]); - let height = rdr.read_u64::()?; - Ok(height) - } - - pub fn index_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[8..16]); - let index = rdr.read_u64::()?; - Ok(index) - } -} - -impl LedgerColumnFamily for DataCf { - type ValueType = Vec; - - fn handle(&self) -> ColumnFamily { - self.handle - } -} - -// The erasure column family -pub struct ErasureCf { - handle: ColumnFamily, -} - -impl ErasureCf { - pub fn new(handle: ColumnFamily) -> Self { - ErasureCf { handle } - } - - pub fn key(slot_height: u64, index: u64) -> Vec { - DataCf::key(slot_height, index) - } -} - -impl LedgerColumnFamily for ErasureCf { - type ValueType = Vec; - - fn handle(&self) -> ColumnFamily { - self.handle - } -} - -// ledger window -pub struct DbLedger { - // Underlying database is automatically closed in the Drop implementation of DB - db: DB, - meta_cf: MetaCf, - data_cf: DataCf, - #[allow(dead_code)] - erasure_cf: ErasureCf, -} - -// TODO: Once we support a window that knows about different leader -// slots, change functions where this is used to take slot height -// as a variable argument -pub const DEFAULT_SLOT_HEIGHT: u64 = 0; -// Column family for metadata about a leader slot -pub const META_CF: &str = "meta"; -// Column family for the data in a leader slot -pub const DATA_CF: &str = "data"; -// Column family for erasure data -pub const ERASURE_CF: &str = "erasure"; - -impl DbLedger { - // Opens a Ledger in directory, provides "infinite" window of blobs - pub fn open(ledger_path: &str) -> Result { - // Use default database options - let mut options = Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - - // Column family names - let cfs = vec![META_CF, DATA_CF, ERASURE_CF]; - - // Open the database - let db = DB::open_cf(&options, ledger_path, &cfs)?; - - // Create the metadata column family - let meta_cf_handle = db.cf_handle(META_CF).unwrap(); - let meta_cf = MetaCf::new(meta_cf_handle); - - // Create the data column family - let data_cf_handle = db.cf_handle(DATA_CF).unwrap(); - let data_cf = DataCf::new(data_cf_handle); - - // Create the erasure column family - let erasure_cf_handle = db.cf_handle(ERASURE_CF).unwrap(); - let erasure_cf = ErasureCf::new(erasure_cf_handle); - - Ok(DbLedger { - db, - meta_cf, - data_cf, - erasure_cf, - }) - } - - pub fn write_blobs<'a, I>(&mut self, start_index: u64, blobs: &'a I) -> Result<()> - where - &'a I: IntoIterator, - { - for (blob, index) in blobs.into_iter().zip(start_index..) { - let key = DataCf::key(DEFAULT_SLOT_HEIGHT, index); - let size = blob.get_size()?; - self.data_cf.put_bytes(&self.db, &key, &blob.data[..size])?; - } - Ok(()) - } - - pub fn process_new_blob(&self, key: &[u8], new_blob: &Blob) -> Result>> { - let slot_height = DataCf::slot_height_from_key(key)?; - let meta_key = MetaCf::key(slot_height); - - let mut should_write_meta = false; - - let mut meta = { - if let Some(meta) = self.db.get_cf(self.meta_cf.handle(), &meta_key)? { - deserialize(&meta)? - } else { - should_write_meta = true; - SlotMeta::new() - } - }; - - let mut index = DataCf::index_from_key(key)?; - if index > meta.received { - meta.received = index; - should_write_meta = true; - } - - let mut consumed_queue = vec![]; - let serialized_blob_data = new_blob.data; - - if meta.consumed == index { - should_write_meta = true; - meta.consumed += 1; - consumed_queue.push(new_blob.data.to_vec()); - // Find the next consecutive block of blobs. - // TODO: account for consecutive blocks that - // span multiple slots - loop { - index += 1; - let key = DataCf::key(slot_height, index); - if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { - consumed_queue.push(blob_data); - meta.consumed += 1; - } else { - break; - } - } - } - - // Atomic write both the metadata and the data - let mut batch = WriteBatch::default(); - if should_write_meta { - batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?; - } - - batch.put_cf(self.data_cf.handle(), key, &serialized_blob_data)?; - self.db.write(batch)?; - - Ok(consumed_queue) - } - - // Fill 'buf' with num_blobs or most number of consecutive - // whole blobs that fit into buf.len() - // - // Return tuple of (number of blob read, total size of blobs read) - pub fn get_blob_bytes( - &mut self, - start_index: u64, - num_blobs: u64, - buf: &mut [u8], - ) -> Result<(u64, u64)> { - let start_key = DataCf::key(DEFAULT_SLOT_HEIGHT, start_index); - let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; - db_iterator.seek(&start_key); - let mut total_blobs = 0; - let mut total_current_size = 0; - for expected_index in start_index..start_index + num_blobs { - if !db_iterator.valid() { - if expected_index == start_index { - return Err(Error::IO(io::Error::new( - io::ErrorKind::NotFound, - "Blob at start_index not found", - ))); - } else { - break; - } - } - - // Check key is the next sequential key based on - // blob index - let key = &db_iterator.key(); - - if key.is_none() { - break; - } - - let key = key.as_ref().unwrap(); - - let index = DataCf::index_from_key(key)?; - - if index != expected_index { - break; - } - - // Get the blob data - let value = &db_iterator.value(); - - if value.is_none() { - break; - } - - let value = value.as_ref().unwrap(); - let blob_data_len = value.len(); - - if total_current_size + blob_data_len > buf.len() { - break; - } - - buf[total_current_size..total_current_size + value.len()].copy_from_slice(value); - total_current_size += blob_data_len; - total_blobs += 1; - - // TODO: Change this logic to support looking for data - // that spans multiple leader slots, once we support - // a window that knows about different leader slots - db_iterator.next(); - } - - Ok((total_blobs, total_current_size as u64)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; - use rocksdb::{Options, DB}; - - #[test] - fn test_put_get_simple() { - let ledger_path = get_tmp_ledger_path("test_put_get_simple"); - let ledger = DbLedger::open(&ledger_path).unwrap(); - - // Test meta column family - let meta = SlotMeta::new(); - let meta_key = MetaCf::key(0); - ledger.meta_cf.put(&ledger.db, &meta_key, &meta).unwrap(); - let result = ledger - .meta_cf - .get(&ledger.db, &meta_key) - .unwrap() - .expect("Expected meta object to exist"); - - assert_eq!(result, meta); - - // Test erasure column family - let erasure = vec![1u8; 16]; - let erasure_key = ErasureCf::key(DEFAULT_SLOT_HEIGHT, 0); - ledger - .erasure_cf - .put(&ledger.db, &erasure_key, &erasure) - .unwrap(); - - let result = ledger - .erasure_cf - .get(&ledger.db, &erasure_key) - .unwrap() - .expect("Expected erasure object to exist"); - - assert_eq!(result, erasure); - - // Test data column family - let data = vec![2u8; 16]; - let data_key = DataCf::key(DEFAULT_SLOT_HEIGHT, 0); - ledger.data_cf.put(&ledger.db, &data_key, &data).unwrap(); - - let result = ledger - .data_cf - .get(&ledger.db, &data_key) - .unwrap() - .expect("Expected data object to exist"); - - assert_eq!(result, data); - - // Destroying database without closing it first is undefined behavior - drop(ledger); - let _ignored = DB::destroy(&Options::default(), &ledger_path); - } - - #[test] - fn test_get_blobs_bytes() { - let shared_blobs = make_tiny_test_entries(10).to_blobs(); - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - - let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes"); - let mut ledger = DbLedger::open(&ledger_path).unwrap(); - ledger.write_blobs(0, &blobs).unwrap(); - - let mut buf = [0; 1024]; - let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf).unwrap(); - let bytes = bytes as usize; - assert_eq!(num_blobs, 1); - { - let blob_data = &buf[..bytes]; - assert_eq!(blob_data, &blobs[0].data[..bytes]); - } - - let (num_blobs, bytes2) = ledger.get_blob_bytes(0, 2, &mut buf).unwrap(); - let bytes2 = bytes2 as usize; - assert_eq!(num_blobs, 2); - assert!(bytes2 > bytes); - { - let blob_data_1 = &buf[..bytes]; - assert_eq!(blob_data_1, &blobs[0].data[..bytes]); - - let blob_data_2 = &buf[bytes..bytes2]; - assert_eq!(blob_data_2, &blobs[1].data[..bytes2 - bytes]); - } - - // buf size part-way into blob[1], should just return blob[0] - let mut buf = vec![0; bytes + 1]; - let (num_blobs, bytes3) = ledger.get_blob_bytes(0, 2, &mut buf).unwrap(); - assert_eq!(num_blobs, 1); - let bytes3 = bytes3 as usize; - assert_eq!(bytes3, bytes); - - let mut buf = vec![0; bytes2 - 1]; - let (num_blobs, bytes4) = ledger.get_blob_bytes(0, 2, &mut buf).unwrap(); - assert_eq!(num_blobs, 1); - let bytes4 = bytes4 as usize; - assert_eq!(bytes4, bytes); - - let mut buf = vec![0; bytes * 2]; - let (num_blobs, bytes6) = ledger.get_blob_bytes(9, 1, &mut buf).unwrap(); - assert_eq!(num_blobs, 1); - let bytes6 = bytes6 as usize; - - { - let blob_data = &buf[..bytes6]; - assert_eq!(blob_data, &blobs[9].data[..bytes6]); - } - - // Read out of range - assert!(ledger.get_blob_bytes(20, 2, &mut buf).is_err()); - - // Destroying database without closing it first is undefined behavior - drop(ledger); - let _ignored = DB::destroy(&Options::default(), &ledger_path); - } - - #[test] - fn test_process_new_blobs_basic() { - let shared_blobs = make_tiny_test_entries(2).to_blobs(); - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - - let ledger_path = get_tmp_ledger_path("test_process_new_blobs_basic"); - let ledger = DbLedger::open(&ledger_path).unwrap(); - - // Insert second blob, we're misisng the first blob, so should return nothing - let result = ledger - .process_new_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, 1), blobs[1]) - .unwrap(); - - assert!(result.len() == 0); - let meta = ledger - .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) - .unwrap() - .expect("Expected new metadata object to be created"); - assert!(meta.consumed == 0 && meta.received == 1); - - // Insert first blob, check for consecutive returned blobs - let result = ledger - .process_new_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, 0), blobs[0]) - .unwrap(); - - assert!(result.len() == 2); - let meta = ledger - .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) - .unwrap() - .expect("Expected new metadata object to exist"); - assert!(meta.consumed == 2 && meta.received == 1); - - // Destroying database without closing it first is undefined behavior - drop(ledger); - let _ignored = DB::destroy(&Options::default(), &ledger_path); - } - - #[test] - fn test_process_new_blobs_multiple() { - let num_blobs = 10; - let shared_blobs = make_tiny_test_entries(num_blobs).to_blobs(); - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - - let ledger_path = get_tmp_ledger_path("test_process_new_blobs_multiple"); - let ledger = DbLedger::open(&ledger_path).unwrap(); - - // Insert first blob, check for consecutive returned blobs - for i in (0..num_blobs).rev() { - let result = ledger - .process_new_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[0]) - .unwrap(); - - let meta = ledger - .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) - .unwrap() - .expect("Expected metadata object to exist"); - if i != 0 { - assert_eq!(result.len(), 0); - assert!(meta.consumed == 0 && meta.received == num_blobs as u64 - 1); - } else { - assert_eq!(result.len(), num_blobs); - assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64 - 1); - } - } - - // Destroying database without closing it first is undefined behavior - drop(ledger); - let _ignored = DB::destroy(&Options::default(), &ledger_path); - } -} diff --git a/src/lib.rs b/src/lib.rs index 56553ab73..7f2daa5b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,6 @@ pub mod client; pub mod cluster_info; pub mod budget_program; pub mod compute_leader_finality_service; -pub mod db_ledger; pub mod drone; pub mod entry; #[cfg(feature = "erasure")] @@ -105,7 +104,6 @@ extern crate pnet_datalink; extern crate rayon; extern crate reqwest; extern crate ring; -extern crate rocksdb; extern crate serde; #[macro_use] extern crate serde_derive; diff --git a/src/result.rs b/src/result.rs index 20f0789a7..bb63cae72 100644 --- a/src/result.rs +++ b/src/result.rs @@ -7,7 +7,6 @@ use cluster_info; use erasure; use packet; use poh_recorder; -use rocksdb; use serde_json; use std; use std::any::Any; @@ -30,7 +29,6 @@ pub enum Error { SendError, PohRecorderError(poh_recorder::PohRecorderError), VoteError(vote_stage::VoteError), - RocksDb(rocksdb::Error), } pub type Result = std::result::Result; @@ -109,11 +107,6 @@ impl std::convert::From for Error { Error::VoteError(e) } } -impl std::convert::From for Error { - fn from(e: rocksdb::Error) -> Error { - Error::RocksDb(e) - } -} #[cfg(test)] mod tests {