From d90b8c331d3daca8d1e6f2dd1d8945d8e380fd3c Mon Sep 17 00:00:00 2001 From: "Mark E. Sinclair" <48664490+mark-solana@users.noreply.github.com> Date: Tue, 2 Apr 2019 16:58:07 -0500 Subject: [PATCH] Refactor blocktree storage abstraction (#3588) --- core/src/blocktree.rs | 342 +++++++++++++++------------ core/src/blocktree/db.rs | 414 ++++++++++++++++++++++++++------ core/src/blocktree/kvs.rs | 299 +++++++++--------------- core/src/blocktree/rocks.rs | 454 +++++++++++++----------------------- 4 files changed, 811 insertions(+), 698 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 8b0af6d687..935a0cbe1f 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -8,7 +8,7 @@ use crate::result::{Error, Result}; #[cfg(feature = "kvstore")] use solana_kvstore as kvstore; -use bincode::{deserialize, serialize}; +use bincode::deserialize; use hashbrown::HashMap; @@ -36,16 +36,19 @@ macro_rules! db_imports { { $mod:ident, $db:ident, $db_path:expr } => { mod $mod; - pub use db::{ - Cursor, Database, IndexColumn, IWriteBatch, LedgerColumnFamily, - LedgerColumnFamilyRaw, - }; + use $mod::$db; + use db::columns as cf; + + pub use db::columns; + + pub type Database = db::Database<$db>; + pub type Cursor = db::Cursor<$db, C>; + pub type LedgerColumn = db::LedgerColumn<$db, C>; + pub type WriteBatch = db::WriteBatch<$db>; + + pub trait Column: db::Column<$db> {} + impl> Column for C {} - pub use $mod::{$db, ErasureCf, MetaCf, DataCf, DetachedHeadsCf}; - pub type BlocktreeRawIterator = <$db as Database>::Cursor; - pub type WriteBatch = <$db as Database>::WriteBatch; - pub type OwnedKey = <$db as Database>::OwnedKey; - pub type Key = <$db as Database>::Key; pub const BLOCKTREE_DIRECTORY: &str = $db_path; }; } @@ -117,15 +120,11 @@ impl SlotMeta { // ledger window pub struct Blocktree { - // Underlying database is automatically closed in the Drop implementation of DB - #[cfg(not(feature = "kvstore"))] - db: Arc, - #[cfg(feature = "kvstore")] - db: Arc, - meta_cf: MetaCf, - data_cf: DataCf, - erasure_cf: ErasureCf, - detached_heads_cf: DetachedHeadsCf, + db: Arc, + meta_cf: LedgerColumn, + data_cf: LedgerColumn, + erasure_cf: LedgerColumn, + detached_heads_cf: LedgerColumn, pub new_blobs_signals: Vec>, } @@ -139,6 +138,38 @@ pub const ERASURE_CF: &str = "erasure"; pub const DETACHED_HEADS_CF: &str = "detached_heads"; impl Blocktree { + /// Opens a Ledger in directory, provides "infinite" window of blobs + pub fn open(ledger_path: &str) -> Result { + use std::path::Path; + + fs::create_dir_all(&ledger_path)?; + let ledger_path = Path::new(&ledger_path).join(BLOCKTREE_DIRECTORY); + + // Open the database + let db = Arc::new(Database::open(&ledger_path)?); + + // Create the metadata column family + let meta_cf = LedgerColumn::new(&db); + + // Create the data column family + let data_cf = LedgerColumn::new(&db); + + // Create the erasure column family + let erasure_cf = LedgerColumn::new(&db); + + // Create the detached heads column family + let detached_heads_cf = LedgerColumn::new(&db); + + Ok(Blocktree { + db, + meta_cf, + data_cf, + erasure_cf, + detached_heads_cf, + new_blobs_signals: vec![], + }) + } + pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver)> { let mut blocktree = Self::open(ledger_path)?; let (signal_sender, signal_receiver) = sync_channel(1); @@ -147,37 +178,43 @@ impl Blocktree { Ok((blocktree, signal_receiver)) } + pub fn destroy(ledger_path: &str) -> Result<()> { + // Database::destroy() fails is the path doesn't exist + fs::create_dir_all(ledger_path)?; + let path = std::path::Path::new(ledger_path).join(BLOCKTREE_DIRECTORY); + Database::destroy(&path) + } + pub fn meta(&self, slot: u64) -> Result> { - self.meta_cf.get(&MetaCf::key(&slot)) + self.meta_cf.get(slot) } pub fn detached_head(&self, slot: u64) -> Result> { - self.detached_heads_cf.get(&DetachedHeadsCf::key(&slot)) + self.detached_heads_cf.get(slot) } pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> { - let meta_key = MetaCf::key(&slot); - if let Some(mut meta) = self.meta_cf.get(&meta_key)? { + if let Some(mut meta) = self.meta_cf.get(slot)? { for index in 0..meta.received { - self.data_cf.delete_by_index(&(slot, index))?; + self.data_cf.delete((slot, index))?; } meta.consumed = 0; meta.received = 0; meta.last_index = std::u64::MAX; meta.next_slots = vec![]; - self.meta_cf.put(&meta_key, &meta)?; + self.meta_cf.put(0, &meta)?; } Ok(()) } pub fn get_next_slot(&self, slot: u64) -> Result> { - let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?; - db_iterator.seek(&MetaCf::key(&(slot + 1))); + let mut db_iterator = self.db.cursor::()?; + db_iterator.seek(slot + 1); if !db_iterator.valid() { Ok(None) } else { - let key = &db_iterator.key().expect("Expected valid key"); - Ok(Some(MetaCf::index(&key))) + let next_slot = db_iterator.key().expect("Expected valid key"); + Ok(Some(next_slot)) } } @@ -328,11 +365,7 @@ impl Blocktree { // Check if the working copy of the metadata has changed if Some(meta) != meta_backup.as_ref() { should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup); - write_batch.put_cf( - self.meta_cf.handle(), - &MetaCf::key(slot), - &serialize(&meta)?, - )?; + write_batch.put::(*slot, &meta)?; } } @@ -357,9 +390,8 @@ impl Blocktree { buf: &mut [u8], slot: u64, ) -> Result<(u64, u64)> { - let start_key = DataCf::key(&(slot, start_index)); - let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; - db_iterator.seek(&start_key); + let mut db_iterator = self.db.cursor::()?; + db_iterator.seek((slot, start_index)); let mut total_blobs = 0; let mut total_current_size = 0; for expected_index in start_index..start_index + num_blobs { @@ -376,14 +408,13 @@ impl Blocktree { // Check key is the next sequential key based on // blob index - let key = &db_iterator.key().expect("Expected valid key"); - let index = DataCf::index(key).1; + let (_, index) = db_iterator.key().expect("Expected valid key"); if index != expected_index { break; } // Get the blob data - let value = &db_iterator.value(); + let value = &db_iterator.value_bytes(); if value.is_none() { break; @@ -410,24 +441,24 @@ impl Blocktree { } pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - self.erasure_cf.get_by_index(&(slot, index)) + self.erasure_cf.get_bytes((slot, index)) } pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { - self.erasure_cf.delete_by_index(&(slot, index)) + self.erasure_cf.delete((slot, index)) } pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - self.data_cf.get_by_index(&(slot, index)) + self.data_cf.get_bytes((slot, index)) } pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.erasure_cf.put_by_index(&(slot, index), bytes) + self.erasure_cf.put_bytes((slot, index), bytes) } - pub fn put_data_raw(&self, key: &Key, value: &[u8]) -> Result<()> { - self.data_cf.put_bytes(key, value) + pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> { + self.data_cf.put_bytes((slot, index), value) } pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.data_cf.put_by_index(&(slot, index), bytes) + self.data_cf.put_bytes((slot, index), bytes) } pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result> { @@ -452,16 +483,16 @@ impl Blocktree { // Given a start and end entry index, find all the missing // indexes in the ledger in the range [start_index, end_index) // for the slot with the specified slot - fn find_missing_indexes( - db_iterator: &mut BlocktreeRawIterator, + fn find_missing_indexes( + db_iterator: &mut Cursor, slot: u64, start_index: u64, end_index: u64, - key: &dyn Fn(u64, u64) -> OwnedKey, - slot_from_key: &dyn Fn(&Key) -> u64, - index_from_key: &dyn Fn(&Key) -> u64, max_missing: usize, - ) -> Vec { + ) -> Vec + where + C: Column, + { if start_index >= end_index || max_missing == 0 { return vec![]; } @@ -469,7 +500,7 @@ impl Blocktree { let mut missing_indexes = vec![]; // Seek to the first blob with index >= start_index - db_iterator.seek(&key(slot, start_index)); + db_iterator.seek((slot, start_index)); // The index of the first missing blob in the slot let mut prev_index = start_index; @@ -483,13 +514,12 @@ impl Blocktree { } break; } - let current_key = db_iterator.key().expect("Expect a valid key"); - let current_slot = slot_from_key(¤t_key); + let (current_slot, index) = db_iterator.key().expect("Expect a valid key"); let current_index = { if current_slot > slot { end_index } else { - index_from_key(¤t_key) + index } }; let upper_index = cmp::min(current_index, end_index); @@ -523,18 +553,11 @@ impl Blocktree { end_index: u64, max_missing: usize, ) -> Vec { - let mut db_iterator = self.data_cf.raw_iterator(); - - Self::find_missing_indexes( - &mut db_iterator, - slot, - start_index, - end_index, - &|slot, index| DataCf::key(&(slot, index)), - &MetaCf::index, - &|key| DataCf::index(key).1, - max_missing, - ) + if let Ok(mut db_iterator) = self.data_cf.cursor() { + Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) + } else { + vec![] + } } pub fn find_missing_coding_indexes( @@ -544,18 +567,11 @@ impl Blocktree { end_index: u64, max_missing: usize, ) -> Vec { - let mut db_iterator = self.erasure_cf.raw_iterator(); - - Self::find_missing_indexes( - &mut db_iterator, - slot, - start_index, - end_index, - &|slot, index| ErasureCf::key(&(slot, index)), - &MetaCf::index, - &|key| ErasureCf::index(key).1, - max_missing, - ) + if let Ok(mut db_iterator) = self.erasure_cf.cursor() { + Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) + } else { + vec![] + } } /// Returns the entry vector for the slot starting with `blob_start_index` @@ -569,6 +585,77 @@ impl Blocktree { .map(|x| x.0) } + pub fn read_ledger_blobs(&self) -> impl Iterator { + self.data_cf + .iter() + .unwrap() + .map(|(_, blob_data)| Blob::new(&blob_data)) + } + + /// Return an iterator for all the entries in the given file. + pub fn read_ledger(&self) -> Result> { + use crate::entry::EntrySlice; + use std::collections::VecDeque; + + struct EntryIterator { + db_iterator: Cursor, + + // TODO: remove me when replay_stage is iterating by block (Blocktree) + // this verification is duplicating that of replay_stage, which + // can do this in parallel + blockhash: Option, + // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 + // rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash + // when the db_iterator member above is dropped. + // _blocktree is unused, but dropping _blocktree results in a broken db_iterator + // you have to hold the database open in order to iterate over it, and in order + // for db_iterator to be able to run Drop + // _blocktree: Blocktree, + entries: VecDeque, + } + + impl Iterator for EntryIterator { + type Item = Entry; + + fn next(&mut self) -> Option { + if !self.entries.is_empty() { + return Some(self.entries.pop_front().unwrap()); + } + + if self.db_iterator.valid() { + if let Some(value) = self.db_iterator.value_bytes() { + if let Ok(next_entries) = + deserialize::>(&value[BLOB_HEADER_SIZE..]) + { + if let Some(blockhash) = self.blockhash { + if !next_entries.verify(&blockhash) { + return None; + } + } + self.db_iterator.next(); + if next_entries.is_empty() { + return None; + } + self.entries = VecDeque::from(next_entries); + let entry = self.entries.pop_front().unwrap(); + self.blockhash = Some(entry.hash); + return Some(entry); + } + } + } + None + } + } + let mut db_iterator = self.data_cf.cursor()?; + + db_iterator.seek_to_first(); + Ok(EntryIterator { + entries: VecDeque::new(), + db_iterator, + blockhash: None, + }) + } + pub fn get_slot_entries_with_blob_count( &self, slot: u64, @@ -648,7 +735,7 @@ impl Blocktree { // Write all the newly changed slots in new_chained_slots to the write_batch for (slot, meta) in new_chained_slots.iter() { let meta: &SlotMeta = &RefCell::borrow(&*meta); - write_batch.put_cf(self.meta_cf.handle(), &MetaCf::key(slot), &serialize(meta)?)?; + write_batch.put::(*slot, meta)?; } Ok(()) } @@ -692,21 +779,14 @@ impl Blocktree { ); if Self::is_detached_head(&RefCell::borrow(&*prev_slot_meta)) { - write_batch.put_cf( - self.detached_heads_cf.handle(), - &DetachedHeadsCf::key(&prev_slot), - &serialize(&true)?, - )?; + write_batch.put::(prev_slot, &true)?; } } } // At this point this slot has received a parent, so no longer a detached head if is_detached_head { - write_batch.delete_cf( - self.detached_heads_cf.handle(), - &DetachedHeadsCf::key(&slot), - )?; + write_batch.delete::(slot)?; } } @@ -883,12 +963,11 @@ impl Blocktree { } }; - let key = DataCf::key(&(blob_slot, blob_index)); let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only some of these changes going through. - write_batch.put_cf(self.data_cf.handle(), &key, serialized_blob_data)?; + write_batch.put_bytes::((blob_slot, blob_index), serialized_blob_data)?; prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data); // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same blob. @@ -927,7 +1006,7 @@ impl Blocktree { // Try to find the next blob we're looking for in the prev_inserted_blob_datas if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) { blobs.push(Cow::Borrowed(*prev_blob_data)); - } else if let Some(blob_data) = self.data_cf.get_by_index(&(slot, current_index))? { + } else if let Some(blob_data) = self.data_cf.get_bytes((slot, current_index))? { // Try to find the next blob we're looking for in the database blobs.push(Cow::Owned(blob_data)); } else { @@ -944,7 +1023,6 @@ impl Blocktree { // don't count as ticks, even if they're empty entries fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> { // TODO: change bootstrap height to number of slots - let meta_key = MetaCf::key(&0); let mut bootstrap_meta = SlotMeta::new(0, 1); let last = blobs.last().unwrap(); @@ -953,15 +1031,10 @@ impl Blocktree { bootstrap_meta.is_connected = true; let mut batch = self.db.batch()?; - batch.put_cf( - self.meta_cf.handle(), - &meta_key, - &serialize(&bootstrap_meta)?, - )?; + batch.put::(0, &bootstrap_meta)?; for blob in blobs { - let key = DataCf::key(&(blob.slot(), blob.index())); let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; - batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; + batch.put_bytes::((blob.slot(), blob.index()), serialized_blob_datas)?; } self.db.write(batch)?; Ok(()) @@ -1077,7 +1150,6 @@ pub fn tmp_copy_blocktree(from: &str, name: &str) -> String { #[cfg(test)] pub mod tests { use super::*; - use crate::blocktree::db::Database; use crate::entry::{ create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice, }; @@ -1208,11 +1280,10 @@ pub mod tests { // Test meta column family let meta = SlotMeta::new(0, 1); - let meta_key = MetaCf::key(&0); - ledger.meta_cf.put(&meta_key, &meta).unwrap(); + ledger.meta_cf.put(0, &meta).unwrap(); let result = ledger .meta_cf - .get(&meta_key) + .get(0) .unwrap() .expect("Expected meta object to exist"); @@ -1220,15 +1291,12 @@ pub mod tests { // Test erasure column family let erasure = vec![1u8; 16]; - let erasure_key = ErasureCf::key(&(0, 0)); - ledger - .erasure_cf - .put_bytes(&erasure_key[..], &erasure) - .unwrap(); + let erasure_key = (0, 0); + ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap(); let result = ledger .erasure_cf - .get_bytes(&erasure_key[..]) + .get_bytes(erasure_key) .unwrap() .expect("Expected erasure object to exist"); @@ -1236,12 +1304,12 @@ pub mod tests { // Test data column family let data = vec![2u8; 16]; - let data_key = DataCf::key(&(0, 0)); - ledger.data_cf.put_bytes(&data_key, &data).unwrap(); + let data_key = (0, 0); + ledger.data_cf.put_bytes(data_key, &data).unwrap(); let result = ledger .data_cf - .get_bytes(&data_key) + .get_bytes(data_key) .unwrap() .expect("Expected data object to exist"); @@ -1336,7 +1404,7 @@ pub mod tests { let meta = ledger .meta_cf - .get(&MetaCf::key(&0)) + .get(0) .unwrap() .expect("Expected new metadata object to be created"); assert!(meta.consumed == 0 && meta.received == num_entries); @@ -1351,7 +1419,7 @@ pub mod tests { let meta = ledger .meta_cf - .get(&MetaCf::key(&0)) + .get(0) .unwrap() .expect("Expected new metadata object to exist"); assert_eq!(meta.consumed, num_entries); @@ -1381,7 +1449,7 @@ pub mod tests { let meta = ledger .meta_cf - .get(&MetaCf::key(&0)) + .get(0) .unwrap() .expect("Expected metadata object to exist"); assert_eq!(meta.parent_slot, 0); @@ -1429,16 +1497,15 @@ pub mod tests { let mut db_iterator = blocktree .db - .raw_iterator_cf(blocktree.data_cf.handle()) + .cursor::() .expect("Expected to be able to open database iterator"); - db_iterator.seek(&DataCf::key(&(slot, 1))); + db_iterator.seek((slot, 1)); // Iterate through ledger for i in 0..num_entries { assert!(db_iterator.valid()); - let current_key = db_iterator.key().expect("Expected a valid key"); - let current_index = DataCf::index(¤t_key).1; + let (_, current_index) = db_iterator.key().expect("Expected a valid key"); assert_eq!(current_index, (1 as u64) << (i * 8)); db_iterator.next(); } @@ -1558,8 +1625,7 @@ pub mod tests { assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); - let meta_key = MetaCf::key(&slot); - let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); + let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); if num_entries % 2 == 0 { assert_eq!(meta.received, num_entries); } else { @@ -1580,8 +1646,7 @@ pub mod tests { original_entries, ); - let meta_key = MetaCf::key(&slot); - let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); + let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); assert_eq!(meta.received, num_entries); assert_eq!(meta.consumed, num_entries); assert_eq!(meta.parent_slot, 0); @@ -1633,8 +1698,7 @@ pub mod tests { assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,); - let meta_key = MetaCf::key(&0); - let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); + let meta = blocktree.meta_cf.get(0).unwrap().unwrap(); assert_eq!(meta.consumed, num_unique_entries); assert_eq!(meta.received, num_unique_entries); assert_eq!(meta.parent_slot, 0); @@ -2096,17 +2160,14 @@ pub mod tests { assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty()); let mut meta0 = SlotMeta::new(0, 0); - blocktree - .meta_cf - .put_by_index(&0, &serialize(&meta0).unwrap()) - .unwrap(); + blocktree.meta_cf.put(0, &meta0).unwrap(); // Slot exists, chains to nothing let expected: HashMap> = HashMap::from_iter(vec![(0, vec![])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected); meta0.next_slots = vec![1, 2]; - blocktree.meta_cf.put(&MetaCf::key(&0), &meta0).unwrap(); + blocktree.meta_cf.put(0, &meta0).unwrap(); // Slot exists, chains to some other slots let expected: HashMap> = @@ -2116,10 +2177,7 @@ pub mod tests { let mut meta3 = SlotMeta::new(3, 1); meta3.next_slots = vec![10, 5]; - blocktree - .meta_cf - .put_by_index(&3, &serialize(&meta3).unwrap()) - .unwrap(); + blocktree.meta_cf.put(3, &meta3).unwrap(); let expected: HashMap> = HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected); @@ -2225,8 +2283,7 @@ pub mod tests { entries[i as usize] ); - let meta_key = MetaCf::key(&i); - let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); + let meta = blocktree.meta_cf.get(i).unwrap().unwrap(); assert_eq!(meta.received, i + 1); assert_eq!(meta.last_index, i); if i != 0 { @@ -2448,13 +2505,10 @@ pub mod tests { fn get_detached_heads(blocktree: &Blocktree) -> Vec { let mut results = vec![]; - let mut iter = blocktree - .db - .raw_iterator_cf(blocktree.detached_heads_cf.handle()) - .unwrap(); + let mut iter = blocktree.detached_heads_cf.cursor().unwrap(); iter.seek_to_first(); while iter.valid() { - results.push(DetachedHeadsCf::index(&iter.key().unwrap())); + results.push(iter.key().unwrap()); iter.next(); } results diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 723b196007..e31c7708bd 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -1,4 +1,3 @@ -use crate::entry::Entry; use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; @@ -7,25 +6,54 @@ use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Borrow; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::path::Path; use std::sync::Arc; -pub trait Database: Sized + Send + Sync { - type Error: Into; - type Key: ?Sized; - type OwnedKey: Borrow; - type ColumnFamily; - type Cursor: Cursor; - type EntryIter: Iterator; - type WriteBatch: IWriteBatch; +pub mod columns { + #[derive(Debug)] + /// SlotMeta Column + pub struct SlotMeta; - fn cf_handle(&self, cf: &str) -> Option; + #[derive(Debug)] + /// DetachedHeads Column + pub struct DetachedHeads; + + #[derive(Debug)] + /// Erasure Column + pub struct Coding; + + #[derive(Debug)] + /// Data Column + pub struct Data; +} + +pub trait Backend: Sized + Send + Sync { + type Key: ?Sized + ToOwned; + type OwnedKey: Borrow; + type ColumnFamily: Clone; + type Cursor: DbCursor; + type Iter: Iterator, Box<[u8]>)>; + type WriteBatch: IWriteBatch; + type Error: Into; + + fn open(path: &Path) -> Result; + + fn columns(&self) -> Vec<&'static str>; + + fn destroy(path: &Path) -> Result<()>; + + fn cf_handle(&self, cf: &str) -> Self::ColumnFamily; fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result>>; - fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, data: &[u8]) -> Result<()>; + fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, value: &[u8]) -> Result<()>; fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>; + fn iterator_cf(&self, cf: Self::ColumnFamily) -> Result; + fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result; fn write(&self, batch: Self::WriteBatch) -> Result<()>; @@ -33,103 +61,343 @@ pub trait Database: Sized + Send + Sync { fn batch(&self) -> Result; } -pub trait Cursor { +pub trait Column +where + B: Backend, +{ + const NAME: &'static str; + type Index; + + fn key(index: Self::Index) -> B::OwnedKey; + fn index(key: &B::Key) -> Self::Index; +} + +pub trait DbCursor +where + B: Backend, +{ fn valid(&self) -> bool; - fn seek(&mut self, key: &D::Key); + fn seek(&mut self, key: &B::Key); fn seek_to_first(&mut self); fn next(&mut self); - fn key(&self) -> Option; + fn key(&self) -> Option; fn value(&self) -> Option>; } -pub trait IWriteBatch { - fn put_cf(&mut self, cf: D::ColumnFamily, key: &D::Key, data: &[u8]) -> Result<()>; +pub trait IWriteBatch +where + B: Backend, +{ + fn put_cf(&mut self, cf: B::ColumnFamily, key: &B::Key, value: &[u8]) -> Result<()>; + fn delete_cf(&mut self, cf: B::ColumnFamily, key: &B::Key) -> Result<()>; } -pub trait LedgerColumnFamily: LedgerColumnFamilyRaw { - type ValueType: DeserializeOwned + Serialize; +pub trait TypedColumn: Column +where + B: Backend, +{ + type Type: Serialize + DeserializeOwned; +} - fn get(&self, key: &D::Key) -> Result> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key)?; +#[derive(Debug, Clone)] +pub struct Database +where + B: Backend, +{ + backend: B, +} - if let Some(raw) = data_bytes { - let result: Self::ValueType = deserialize(&raw)?; - Ok(Some(result)) +#[derive(Debug, Clone)] +pub struct Cursor +where + B: Backend, + C: Column, +{ + db_cursor: B::Cursor, + column: PhantomData, + backend: PhantomData, +} + +#[derive(Debug, Clone)] +pub struct LedgerColumn +where + B: Backend, + C: Column, +{ + pub db: Arc>, + column: PhantomData, +} + +#[derive(Debug)] +pub struct WriteBatch +where + B: Backend, +{ + write_batch: B::WriteBatch, + backend: PhantomData, + map: HashMap<&'static str, B::ColumnFamily>, +} + +impl Database +where + B: Backend, +{ + pub fn open(path: &Path) -> Result { + let backend = B::open(path)?; + + Ok(Database { backend }) + } + + pub fn destroy(path: &Path) -> Result<()> { + B::destroy(path)?; + + Ok(()) + } + + pub fn get_bytes(&self, key: C::Index) -> Result>> + where + C: Column, + { + self.backend + .get_cf(self.cf_handle::(), C::key(key).borrow()) + } + + pub fn put_bytes(&self, key: C::Index, data: &[u8]) -> Result<()> + where + C: Column, + { + self.backend + .put_cf(self.cf_handle::(), C::key(key).borrow(), data) + } + + pub fn delete(&self, key: C::Index) -> Result<()> + where + C: Column, + { + self.backend + .delete_cf(self.cf_handle::(), C::key(key).borrow()) + } + + pub fn get(&self, key: C::Index) -> Result> + where + C: TypedColumn, + { + if let Some(serialized_value) = self + .backend + .get_cf(self.cf_handle::(), C::key(key).borrow())? + { + let value = deserialize(&serialized_value)?; + + Ok(Some(value)) } else { Ok(None) } } - fn put(&self, key: &D::Key, value: &Self::ValueType) -> Result<()> { - let db = self.db(); - let serialized = serialize(value)?; - db.put_cf(self.handle(), key, &serialized)?; - Ok(()) + pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> + where + C: TypedColumn, + { + let serialized_value = serialize(value)?; + + self.backend.put_cf( + self.cf_handle::(), + C::key(key).borrow(), + &serialized_value, + ) } - fn is_empty(&self) -> Result { - let mut db_iterator = self.db().raw_iterator_cf(self.handle())?; - db_iterator.seek_to_first(); - Ok(!db_iterator.valid()) + pub fn cursor(&self) -> Result> + where + C: Column, + { + let db_cursor = self.backend.raw_iterator_cf(self.cf_handle::())?; + + Ok(Cursor { + db_cursor, + column: PhantomData, + backend: PhantomData, + }) + } + + pub fn iter(&self) -> Result)>> + where + C: Column, + { + let iter = self + .backend + .iterator_cf(self.cf_handle::())? + .map(|(key, value)| (C::index(&key), value.into())); + + Ok(iter) + } + + pub fn batch(&self) -> Result> { + let db_write_batch = self.backend.batch()?; + let map = self + .backend + .columns() + .into_iter() + .map(|desc| (desc, self.backend.cf_handle(desc))) + .collect(); + + Ok(WriteBatch { + write_batch: db_write_batch, + backend: PhantomData, + map, + }) + } + + pub fn write(&self, batch: WriteBatch) -> Result<()> { + self.backend.write(batch.write_batch) + } + + #[inline] + pub fn cf_handle(&self) -> B::ColumnFamily + where + C: Column, + { + self.backend.cf_handle(C::NAME).clone() } } -pub trait LedgerColumnFamilyRaw { - fn get_bytes(&self, key: &D::Key) -> Result>> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key.borrow())?; - Ok(data_bytes.map(|x| x.to_vec())) +impl Cursor +where + B: Backend, + C: Column, +{ + pub fn valid(&self) -> bool { + self.db_cursor.valid() } - fn put_bytes(&self, key: &D::Key, serialized_value: &[u8]) -> Result<()> { - let db = self.db(); - db.put_cf(self.handle(), key.borrow(), &serialized_value)?; - Ok(()) + pub fn seek(&mut self, key: C::Index) { + self.db_cursor.seek(C::key(key).borrow()); } - fn delete(&self, key: &D::Key) -> Result<()> { - let db = self.db(); - db.delete_cf(self.handle(), key.borrow())?; - Ok(()) + pub fn seek_to_first(&mut self) { + self.db_cursor.seek_to_first(); } - fn raw_iterator(&self) -> D::Cursor { - let db = self.db(); - db.raw_iterator_cf(self.handle()) - .expect("Expected to be able to open database iterator") + pub fn next(&mut self) { + self.db_cursor.next(); } - fn handle(&self) -> D::ColumnFamily; + pub fn key(&self) -> Option { + if let Some(key) = self.db_cursor.key() { + Some(C::index(key.borrow())) + } else { + None + } + } - fn db(&self) -> &Arc; + pub fn value_bytes(&self) -> Option> { + self.db_cursor.value() + } } -pub trait IndexColumn: LedgerColumnFamilyRaw { - type Index; - - fn get_by_index(&self, index: &Self::Index) -> Result>> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), Self::key(index).borrow())?; - Ok(data_bytes.map(|x| x.to_vec())) +impl Cursor +where + B: Backend, + C: TypedColumn, +{ + pub fn value(&self) -> Option { + if let Some(bytes) = self.db_cursor.value() { + let value = deserialize(&bytes).ok()?; + Some(value) + } else { + None + } + } +} + +impl LedgerColumn +where + B: Backend, + C: Column, +{ + pub fn new(db: &Arc>) -> Self { + LedgerColumn { + db: Arc::clone(db), + column: PhantomData, + } + } + + pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { + self.db + .backend + .put_cf(self.handle(), C::key(key).borrow(), value) + } + + pub fn get_bytes(&self, key: C::Index) -> Result>> { + self.db.backend.get_cf(self.handle(), C::key(key).borrow()) + } + + pub fn delete(&self, key: C::Index) -> Result<()> { + self.db + .backend + .delete_cf(self.handle(), C::key(key).borrow()) + } + + pub fn cursor(&self) -> Result> { + self.db.cursor() + } + + pub fn iter(&self) -> Result)>> { + self.db.iter::() + } + + pub fn handle(&self) -> B::ColumnFamily { + self.db.cf_handle::() + } + + pub fn is_empty(&self) -> Result { + let mut cursor = self.cursor()?; + cursor.seek_to_first(); + Ok(!cursor.valid()) + } +} + +impl LedgerColumn +where + B: Backend, + C: TypedColumn, +{ + pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { + self.db.put::(key, value) + } + + pub fn get(&self, key: C::Index) -> Result> { + self.db.get::(key) + } +} + +impl WriteBatch +where + B: Backend, +{ + pub fn put_bytes>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { + self.write_batch + .put_cf(self.get_cf::(), C::key(key).borrow(), bytes) + } + + pub fn delete>(&mut self, key: C::Index) -> Result<()> { + self.write_batch + .delete_cf(self.get_cf::(), C::key(key).borrow()) + } + + pub fn put>(&mut self, key: C::Index, value: &C::Type) -> Result<()> { + let serialized_value = serialize(&value)?; + self.write_batch + .put_cf(self.get_cf::(), C::key(key).borrow(), &serialized_value) + } + + #[inline] + fn get_cf>(&self) -> B::ColumnFamily { + self.map[C::NAME].clone() } - - fn put_by_index(&self, index: &Self::Index, serialized_value: &[u8]) -> Result<()> { - let db = self.db(); - db.put_cf(self.handle(), Self::key(index).borrow(), &serialized_value)?; - Ok(()) - } - - fn delete_by_index(&self, index: &Self::Index) -> Result<()> { - self.delete(Self::key(index).borrow()) - } - - fn index(key: &D::Key) -> Self::Index; - - fn key(index: &Self::Index) -> D::OwnedKey; } diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index eb071debd8..8ae67105ab 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -1,85 +1,42 @@ -use crate::entry::Entry; -use crate::packet::Blob; +use crate::blocktree::db::columns as cf; +use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; +use crate::blocktree::BlocktreeError; use crate::result::{Error, Result}; use byteorder::{BigEndian, ByteOrder}; use solana_kvstore::{self as kvstore, Key, KvStore}; -use std::sync::Arc; +use std::path::Path; -use super::db::{ - Cursor, Database, IWriteBatch, IndexColumn, LedgerColumnFamily, LedgerColumnFamilyRaw, -}; -use super::{Blocktree, BlocktreeError}; +type ColumnFamily = u64; #[derive(Debug)] pub struct Kvs(KvStore); -/// The metadata column family -#[derive(Debug)] -pub struct MetaCf { - db: Arc, -} +/// Dummy struct for now +#[derive(Debug, Clone, Copy)] +pub struct Dummy; -/// The data column family -#[derive(Debug)] -pub struct DataCf { - db: Arc, -} - -/// The erasure column family -#[derive(Debug)] -pub struct ErasureCf { - db: Arc, -} - -/// The detached heads column family -#[derive(Debug)] -pub struct DetachedHeadsCf { - db: Arc, -} - -/// Dummy struct to get things compiling -/// TODO: all this goes away with Blocktree -pub struct EntryIterator(i32); -/// Dummy struct to get things compiling -pub struct KvsCursor; -/// Dummy struct to get things compiling -pub struct ColumnFamily; -/// Dummy struct to get things compiling -pub struct KvsWriteBatch; - -impl Blocktree { - /// Opens a Ledger in directory, provides "infinite" window of blobs - pub fn open(_ledger_path: &str) -> Result { - unimplemented!() - } - - #[allow(unreachable_code)] - pub fn read_ledger_blobs(&self) -> impl Iterator { - unimplemented!(); - self.read_ledger().unwrap().map(|_| Blob::new(&[])) - } - - /// Return an iterator for all the entries in the given file. - #[allow(unreachable_code)] - pub fn read_ledger(&self) -> Result> { - Ok(EntryIterator(unimplemented!())) - } - - pub fn destroy(_ledger_path: &str) -> Result<()> { - unimplemented!() - } -} - -impl Database for Kvs { - type Error = kvstore::Error; +impl Backend for Kvs { type Key = Key; type OwnedKey = Key; type ColumnFamily = ColumnFamily; - type Cursor = KvsCursor; - type EntryIter = EntryIterator; - type WriteBatch = KvsWriteBatch; + type Cursor = Dummy; + type Iter = Dummy; + type WriteBatch = Dummy; + type Error = kvstore::Error; - fn cf_handle(&self, _cf: &str) -> Option { + fn open(_path: &Path) -> Result { + unimplemented!() + } + + fn columns(&self) -> Vec<&'static str> { + unimplemented!() + } + + fn destroy(_path: &Path) -> Result<()> { + unimplemented!() + } + + fn cf_handle(&self, _cf: &str) -> ColumnFamily { unimplemented!() } @@ -87,28 +44,101 @@ impl Database for Kvs { unimplemented!() } - fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _data: &[u8]) -> Result<()> { + fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> { unimplemented!() } - fn delete_cf(&self, _cf: Self::ColumnFamily, _key: &Key) -> Result<()> { + fn delete_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result<()> { unimplemented!() } - fn raw_iterator_cf(&self, _cf: Self::ColumnFamily) -> Result { + fn iterator_cf(&self, _cf: ColumnFamily) -> Result { unimplemented!() } - fn write(&self, _batch: Self::WriteBatch) -> Result<()> { + fn raw_iterator_cf(&self, _cf: ColumnFamily) -> Result { unimplemented!() } - fn batch(&self) -> Result { + fn batch(&self) -> Result { + unimplemented!() + } + + fn write(&self, _batch: Dummy) -> Result<()> { unimplemented!() } } -impl Cursor for KvsCursor { +impl Column for cf::Coding { + const NAME: &'static str = super::ERASURE_CF; + type Index = (u64, u64); + + fn key(index: (u64, u64)) -> Key { + cf::Data::key(index) + } + + fn index(key: &Key) -> (u64, u64) { + cf::Data::index(key) + } +} + +impl Column for cf::Data { + const NAME: &'static str = super::DATA_CF; + type Index = (u64, u64); + + fn key((slot, index): (u64, u64)) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + BigEndian::write_u64(&mut key.0[16..], index); + key + } + + fn index(key: &Key) -> (u64, u64) { + let slot = BigEndian::read_u64(&key.0[8..16]); + let index = BigEndian::read_u64(&key.0[16..]); + (slot, index) + } +} + +impl Column for cf::DetachedHeads { + const NAME: &'static str = super::DETACHED_HEADS_CF; + type Index = u64; + + fn key(slot: u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + key + } + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } +} + +impl TypedColumn for cf::DetachedHeads { + type Type = bool; +} + +impl Column for cf::SlotMeta { + const NAME: &'static str = super::META_CF; + type Index = u64; + + fn key(slot: u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + key + } + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } +} + +impl TypedColumn for cf::SlotMeta { + type Type = super::SlotMeta; +} + +impl DbCursor for Dummy { fn valid(&self) -> bool { unimplemented!() } @@ -134,111 +164,21 @@ impl Cursor for KvsCursor { } } -impl IWriteBatch for KvsWriteBatch { - fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _data: &[u8]) -> Result<()> { +impl IWriteBatch for Dummy { + fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> { + unimplemented!() + } + + fn delete_cf(&mut self, _cf: ColumnFamily, _key: &Key) -> Result<()> { unimplemented!() } } -impl LedgerColumnFamilyRaw for DataCf { - fn db(&self) -> &Arc { - &self.db - } +impl Iterator for Dummy { + type Item = (Box, Box<[u8]>); - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::DATA_CF).unwrap() - } -} - -impl IndexColumn for DataCf { - type Index = (u64, u64); - - fn index(key: &Key) -> (u64, u64) { - let slot = BigEndian::read_u64(&key.0[8..16]); - let index = BigEndian::read_u64(&key.0[16..24]); - (slot, index) - } - - fn key(idx: &(u64, u64)) -> Key { - Key::from((0, idx.0, idx.1)) - } -} - -impl LedgerColumnFamilyRaw for ErasureCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::ERASURE_CF).unwrap() - } -} - -impl IndexColumn for ErasureCf { - type Index = (u64, u64); - - fn index(key: &Key) -> (u64, u64) { - DataCf::index(key) - } - - fn key(idx: &(u64, u64)) -> Key { - DataCf::key(idx) - } -} - -impl LedgerColumnFamilyRaw for MetaCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::META_CF).unwrap() - } -} - -impl LedgerColumnFamily for MetaCf { - type ValueType = super::SlotMeta; -} - -impl IndexColumn for MetaCf { - type Index = u64; - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } - - fn key(slot: &u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], *slot); - key - } -} - -impl LedgerColumnFamilyRaw for DetachedHeadsCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::DETACHED_HEADS_CF).unwrap() - } -} - -impl LedgerColumnFamily for DetachedHeadsCf { - type ValueType = bool; -} - -impl IndexColumn for DetachedHeadsCf { - type Index = u64; - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } - - fn key(slot: &u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], *slot); - key + fn next(&mut self) -> Option { + unimplemented!() } } @@ -247,12 +187,3 @@ impl std::convert::From for Error { Error::BlocktreeError(BlocktreeError::KvsDb(e)) } } - -/// TODO: all this goes away with Blocktree -impl Iterator for EntryIterator { - type Item = Entry; - - fn next(&mut self) -> Option { - unimplemented!() - } -} diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index ba858944cb..cd6ef16fc9 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -1,27 +1,17 @@ -use crate::entry::{Entry, EntrySlice}; -use crate::packet::{Blob, BLOB_HEADER_SIZE}; +use crate::blocktree::db::columns as cf; +use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; +use crate::blocktree::BlocktreeError; use crate::result::{Error, Result}; -use bincode::deserialize; - use byteorder::{BigEndian, ByteOrder}; use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, + self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, IteratorMode, Options, WriteBatch as RWriteBatch, DB, }; -use solana_sdk::hash::Hash; - -use std::collections::VecDeque; use std::fs; use std::path::Path; -use std::sync::Arc; - -use super::db::{ - Cursor, Database, IWriteBatch, IndexColumn, LedgerColumnFamily, LedgerColumnFamilyRaw, -}; -use super::{Blocktree, BlocktreeError}; // A good value for this is the number of cores on the machine const TOTAL_THREADS: i32 = 8; @@ -30,66 +20,30 @@ const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024; #[derive(Debug)] pub struct Rocks(rocksdb::DB); -/// The metadata column family -#[derive(Debug)] -pub struct MetaCf { - db: Arc, -} +impl Backend for Rocks { + type Key = [u8]; + type OwnedKey = Vec; + type ColumnFamily = ColumnFamily; + type Cursor = DBRawIterator; + type Iter = DBIterator; + type WriteBatch = RWriteBatch; + type Error = rocksdb::Error; -/// The data column family -#[derive(Debug)] -pub struct DataCf { - db: Arc, -} + fn open(path: &Path) -> Result { + use crate::blocktree::db::columns::{Coding, Data, DetachedHeads, SlotMeta}; -/// The erasure column family -#[derive(Debug)] -pub struct ErasureCf { - db: Arc, -} - -/// The detached heads column family -#[derive(Debug)] -pub struct DetachedHeadsCf { - db: Arc, -} - -/// TODO: all this goes away with Blocktree -pub struct EntryIterator { - db_iterator: DBRawIterator, - - // TODO: remove me when replay_stage is iterating by block (Blocktree) - // this verification is duplicating that of replay_stage, which - // can do this in parallel - blockhash: Option, - // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 - // rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash - // when the db_iterator member above is dropped. - // _blocktree is unused, but dropping _blocktree results in a broken db_iterator - // you have to hold the database open in order to iterate over it, and in order - // for db_iterator to be able to run Drop - // _blocktree: Blocktree, - entries: VecDeque, -} - -impl Blocktree { - /// Opens a Ledger in directory, provides "infinite" window of blobs - pub fn open(ledger_path: &str) -> Result { - fs::create_dir_all(&ledger_path)?; - let ledger_path = Path::new(ledger_path).join(super::BLOCKTREE_DIRECTORY); + fs::create_dir_all(&path)?; // Use default database options - let db_options = Blocktree::get_db_options(); + let db_options = get_db_options(); // Column family names - let meta_cf_descriptor = - ColumnFamilyDescriptor::new(super::META_CF, Blocktree::get_cf_options()); - let data_cf_descriptor = - ColumnFamilyDescriptor::new(super::DATA_CF, Blocktree::get_cf_options()); - let erasure_cf_descriptor = - ColumnFamilyDescriptor::new(super::ERASURE_CF, Blocktree::get_cf_options()); - let detached_heads_descriptor = - ColumnFamilyDescriptor::new(super::DETACHED_HEADS_CF, Blocktree::get_cf_options()); + let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); + let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); + let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); + let detached_heads_descriptor = + ColumnFamilyDescriptor::new(DetachedHeads::NAME, get_cf_options()); + let cfs = vec![ meta_cf_descriptor, data_cf_descriptor, @@ -98,138 +52,155 @@ impl Blocktree { ]; // Open the database - let db = Arc::new(Rocks(DB::open_cf_descriptors( - &db_options, - ledger_path, - cfs, - )?)); + let db = Rocks(DB::open_cf_descriptors(&db_options, path, cfs)?); - // Create the metadata column family - let meta_cf = MetaCf { db: db.clone() }; - - // Create the data column family - let data_cf = DataCf { db: db.clone() }; - - // Create the erasure column family - let erasure_cf = ErasureCf { db: db.clone() }; - - let detached_heads_cf = DetachedHeadsCf { db: db.clone() }; - - Ok(Blocktree { - db, - meta_cf, - data_cf, - erasure_cf, - detached_heads_cf, - new_blobs_signals: vec![], - }) + Ok(db) } - pub fn read_ledger_blobs(&self) -> impl Iterator { - self.db - .0 - .iterator_cf(self.data_cf.handle(), IteratorMode::Start) - .unwrap() - .map(|(_, blob_data)| Blob::new(&blob_data)) + fn columns(&self) -> Vec<&'static str> { + use crate::blocktree::db::columns::{Coding, Data, DetachedHeads, SlotMeta}; + + vec![ + Coding::NAME, + Data::NAME, + DetachedHeads::NAME, + SlotMeta::NAME, + ] } - /// Return an iterator for all the entries in the given file. - pub fn read_ledger(&self) -> Result> { - let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; + fn destroy(path: &Path) -> Result<()> { + DB::destroy(&Options::default(), path)?; - db_iterator.seek_to_first(); - Ok(EntryIterator { - entries: VecDeque::new(), - db_iterator, - blockhash: None, - }) - } - - pub fn destroy(ledger_path: &str) -> Result<()> { - // DB::destroy() fails if `ledger_path` doesn't exist - fs::create_dir_all(&ledger_path)?; - let ledger_path = Path::new(ledger_path).join(super::BLOCKTREE_DIRECTORY); - DB::destroy(&Options::default(), &ledger_path)?; Ok(()) } - fn get_cf_options() -> Options { - let mut options = Options::default(); - options.set_max_write_buffer_number(32); - options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); - options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); - options - } - - fn get_db_options() -> Options { - let mut options = Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - options.increase_parallelism(TOTAL_THREADS); - options.set_max_background_flushes(4); - options.set_max_background_compactions(4); - options.set_max_write_buffer_number(32); - options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); - options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); - options - } -} - -impl Database for Rocks { - type Error = rocksdb::Error; - type Key = [u8]; - type OwnedKey = Vec; - type ColumnFamily = ColumnFamily; - type Cursor = DBRawIterator; - type EntryIter = EntryIterator; - type WriteBatch = RWriteBatch; - - fn cf_handle(&self, cf: &str) -> Option { - self.0.cf_handle(cf) + fn cf_handle(&self, cf: &str) -> ColumnFamily { + self.0 + .cf_handle(cf) + .expect("should never get an unknown column") } fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result>> { - let opt = self.0.get_cf(cf, key)?; - Ok(opt.map(|dbvec| dbvec.to_vec())) + let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec()); + Ok(opt) } - fn put_cf(&self, cf: ColumnFamily, key: &[u8], data: &[u8]) -> Result<()> { - self.0.put_cf(cf, key, data)?; + fn put_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { + self.0.put_cf(cf, key, value)?; Ok(()) } - fn delete_cf(&self, cf: Self::ColumnFamily, key: &[u8]) -> Result<()> { - self.0.delete_cf(cf, key).map_err(From::from) + fn delete_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<()> { + self.0.delete_cf(cf, key)?; + Ok(()) } - fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result { - Ok(self.0.raw_iterator_cf(cf)?) + fn iterator_cf(&self, cf: ColumnFamily) -> Result { + let raw_iter = self.0.iterator_cf(cf, IteratorMode::Start)?; + + Ok(raw_iter) } - fn write(&self, batch: Self::WriteBatch) -> Result<()> { - self.0.write(batch).map_err(From::from) + fn raw_iterator_cf(&self, cf: ColumnFamily) -> Result { + let raw_iter = self.0.raw_iterator_cf(cf)?; + + Ok(raw_iter) } - fn batch(&self) -> Result { + fn batch(&self) -> Result { Ok(RWriteBatch::default()) } + + fn write(&self, batch: RWriteBatch) -> Result<()> { + self.0.write(batch)?; + Ok(()) + } } -impl Cursor for DBRawIterator { +impl Column for cf::Coding { + const NAME: &'static str = super::ERASURE_CF; + type Index = (u64, u64); + + fn key(index: (u64, u64)) -> Vec { + cf::Data::key(index) + } + + fn index(key: &[u8]) -> (u64, u64) { + cf::Data::index(key) + } +} + +impl Column for cf::Data { + const NAME: &'static str = super::DATA_CF; + type Index = (u64, u64); + + fn key((slot, index): (u64, u64)) -> Vec { + let mut key = vec![0; 16]; + BigEndian::write_u64(&mut key[..8], slot); + BigEndian::write_u64(&mut key[8..16], index); + key + } + + fn index(key: &[u8]) -> (u64, u64) { + let slot = BigEndian::read_u64(&key[..8]); + let index = BigEndian::read_u64(&key[8..16]); + (slot, index) + } +} + +impl Column for cf::DetachedHeads { + const NAME: &'static str = super::DETACHED_HEADS_CF; + type Index = u64; + + fn key(slot: u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], slot); + key + } + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } +} + +impl TypedColumn for cf::DetachedHeads { + type Type = bool; +} + +impl Column for cf::SlotMeta { + const NAME: &'static str = super::META_CF; + type Index = u64; + + fn key(slot: u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], slot); + key + } + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } +} + +impl TypedColumn for cf::SlotMeta { + type Type = super::SlotMeta; +} + +impl DbCursor for DBRawIterator { fn valid(&self) -> bool { DBRawIterator::valid(self) } fn seek(&mut self, key: &[u8]) { - DBRawIterator::seek(self, key) + DBRawIterator::seek(self, key); } fn seek_to_first(&mut self) { - DBRawIterator::seek_to_first(self) + DBRawIterator::seek_to_first(self); } fn next(&mut self) { - DBRawIterator::next(self) + DBRawIterator::next(self); } fn key(&self) -> Option> { @@ -242,114 +213,14 @@ impl Cursor for DBRawIterator { } impl IWriteBatch for RWriteBatch { - fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], data: &[u8]) -> Result<()> { - RWriteBatch::put_cf(self, cf, key, data)?; + fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { + RWriteBatch::put_cf(self, cf, key, value)?; Ok(()) } -} -impl LedgerColumnFamilyRaw for DataCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::DATA_CF).unwrap() - } -} - -impl IndexColumn for DataCf { - type Index = (u64, u64); - - fn index(key: &[u8]) -> (u64, u64) { - let slot = BigEndian::read_u64(&key[..8]); - let index = BigEndian::read_u64(&key[8..16]); - (slot, index) - } - - fn key(idx: &(u64, u64)) -> Vec { - let mut key = vec![0u8; 16]; - BigEndian::write_u64(&mut key[0..8], idx.0); - BigEndian::write_u64(&mut key[8..16], idx.1); - key - } -} - -impl LedgerColumnFamilyRaw for ErasureCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::ERASURE_CF).unwrap() - } -} - -impl IndexColumn for ErasureCf { - type Index = (u64, u64); - - fn index(key: &[u8]) -> (u64, u64) { - DataCf::index(key) - } - - fn key(idx: &(u64, u64)) -> Vec { - DataCf::key(idx) - } -} - -impl LedgerColumnFamilyRaw for MetaCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::META_CF).unwrap() - } -} - -impl LedgerColumnFamily for MetaCf { - type ValueType = super::SlotMeta; -} - -impl IndexColumn for MetaCf { - type Index = u64; - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn key(slot: &u64) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], *slot); - key - } -} - -impl LedgerColumnFamilyRaw for DetachedHeadsCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(super::DETACHED_HEADS_CF).unwrap() - } -} - -impl LedgerColumnFamily for DetachedHeadsCf { - type ValueType = bool; -} - -impl IndexColumn for DetachedHeadsCf { - type Index = u64; - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn key(slot: &u64) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], *slot); - key + fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<()> { + RWriteBatch::delete_cf(self, cf, key)?; + Ok(()) } } @@ -359,34 +230,23 @@ impl std::convert::From for Error { } } -/// TODO: all this goes away with Blocktree -impl Iterator for EntryIterator { - type Item = Entry; - - fn next(&mut self) -> Option { - if !self.entries.is_empty() { - return Some(self.entries.pop_front().unwrap()); - } - - if self.db_iterator.valid() { - if let Some(value) = self.db_iterator.value() { - if let Ok(next_entries) = deserialize::>(&value[BLOB_HEADER_SIZE..]) { - if let Some(blockhash) = self.blockhash { - if !next_entries.verify(&blockhash) { - return None; - } - } - self.db_iterator.next(); - if next_entries.is_empty() { - return None; - } - self.entries = VecDeque::from(next_entries); - let entry = self.entries.pop_front().unwrap(); - self.blockhash = Some(entry.hash); - return Some(entry); - } - } - } - None - } +fn get_cf_options() -> Options { + let mut options = Options::default(); + options.set_max_write_buffer_number(32); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); + options +} + +fn get_db_options() -> Options { + let mut options = Options::default(); + options.create_if_missing(true); + options.create_missing_column_families(true); + options.increase_parallelism(TOTAL_THREADS); + options.set_max_background_flushes(4); + options.set_max_background_compactions(4); + options.set_max_write_buffer_number(32); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); + options }