From 16ff4ac1a8c74062a8888f21aaf6402648d870cc Mon Sep 17 00:00:00 2001 From: Mark Date: Wed, 27 Mar 2019 01:36:39 -0500 Subject: [PATCH] Simplify storage interface in blocktree (#3522) --- core/src/blocktree.rs | 166 +++++++++++++++++------------------- core/src/blocktree/db.rs | 148 +++++++++----------------------- core/src/blocktree/kvs.rs | 135 ++++++++++------------------- core/src/blocktree/rocks.rs | 165 ++++++++++++----------------------- 4 files changed, 218 insertions(+), 396 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 658a34b54a..b0e143e0db 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -31,45 +31,29 @@ use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; mod db; -#[cfg(feature = "kvstore")] -mod kvs; -#[cfg(not(feature = "kvstore"))] -mod rocks; -#[cfg(feature = "kvstore")] -use self::kvs::{DataCf, ErasureCf, Kvs, MetaCf}; -#[cfg(not(feature = "kvstore"))] -use self::rocks::{DataCf, ErasureCf, MetaCf, Rocks}; +macro_rules! db_imports { + { $mod:ident, $db:ident, $db_path:expr } => { + mod $mod; -pub use db::{ - Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily, - LedgerColumnFamilyRaw, -}; + pub use db::{ + Cursor, Database, IndexColumn, IWriteBatch, LedgerColumnFamily, + LedgerColumnFamilyRaw, + }; + + pub use $mod::{$db, ErasureCf, MetaCf, DataCf}; + pub type BlocktreeRawIterator = <$db as Database>::Cursor; + pub type WriteBatch = <$db as Database>::WriteBatch; + pub type OwnedKey = <$db as Database>::OwnedKey; + pub type Key = <$db as Database>::Key; + pub const BLOCKTREE_DIRECTORY: &str = $db_path; + }; +} #[cfg(not(feature = "kvstore"))] -pub type BlocktreeRawIterator = ::Cursor; +db_imports! {rocks, Rocks, "rocksdb"} #[cfg(feature = "kvstore")] -pub type BlocktreeRawIterator = ::Cursor; - -#[cfg(not(feature = "kvstore"))] -pub type WriteBatch = ::WriteBatch; -#[cfg(feature = "kvstore")] -pub type WriteBatch = ::WriteBatch; - -#[cfg(not(feature = "kvstore"))] -type KeyRef = ::KeyRef; -#[cfg(feature = "kvstore")] -type KeyRef = ::KeyRef; - -#[cfg(not(feature = "kvstore"))] -pub type Key = ::Key; -#[cfg(feature = "kvstore")] -pub type Key = ::Key; - -#[cfg(not(feature = "kvstore"))] -pub const BLOCKTREE_DIRECTORY: &str = "rocksdb"; -#[cfg(feature = "kvstore")] -pub const BLOCKTREE_DIRECTORY: &str = "kvstore"; +db_imports! {kvs, Kvs, "kvstore"} #[derive(Debug)] pub enum BlocktreeError { @@ -161,14 +145,14 @@ impl Blocktree { } pub fn meta(&self, slot: u64) -> Result> { - self.meta_cf.get(&MetaCf::key(slot)) + self.meta_cf.get(&MetaCf::key(&slot)) } pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> { - let meta_key = MetaCf::key(slot); + let meta_key = MetaCf::key(&slot); if let Some(mut meta) = self.meta_cf.get(&meta_key)? { for index in 0..meta.received { - self.data_cf.delete_by_slot_index(slot, index)?; + self.data_cf.delete_by_index(&(slot, index))?; } meta.consumed = 0; meta.received = 0; @@ -181,12 +165,12 @@ impl Blocktree { pub fn get_next_slot(&self, slot: u64) -> Result> { let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?; - db_iterator.seek(&MetaCf::key(slot + 1)); + db_iterator.seek(&MetaCf::key(&(slot + 1))); if !db_iterator.valid() { Ok(None) } else { let key = &db_iterator.key().expect("Expected valid key"); - Ok(Some(MetaCf::index_from_key(&key)?)) + Ok(Some(MetaCf::index(&key))) } } @@ -341,7 +325,7 @@ impl Blocktree { should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup); write_batch.put_cf( self.meta_cf.handle(), - &MetaCf::key(*slot), + &MetaCf::key(slot), &serialize(&meta)?, )?; } @@ -368,7 +352,7 @@ impl Blocktree { buf: &mut [u8], slot: u64, ) -> Result<(u64, u64)> { - let start_key = DataCf::key(slot, start_index); + let start_key = DataCf::key(&(slot, start_index)); let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; db_iterator.seek(&start_key); let mut total_blobs = 0; @@ -388,7 +372,7 @@ impl Blocktree { // Check key is the next sequential key based on // blob index let key = &db_iterator.key().expect("Expected valid key"); - let index = DataCf::index_from_key(key)?; + let index = DataCf::index(key).1; if index != expected_index { break; } @@ -421,24 +405,24 @@ impl Blocktree { } pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - self.erasure_cf.get_by_slot_index(slot, index) + self.erasure_cf.get_by_index(&(slot, index)) } pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { - self.erasure_cf.delete_by_slot_index(slot, index) + self.erasure_cf.delete_by_index(&(slot, index)) } pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - self.data_cf.get_by_slot_index(slot, index) + self.data_cf.get_by_index(&(slot, index)) } pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.erasure_cf.put_by_slot_index(slot, index, bytes) + self.erasure_cf.put_by_index(&(slot, index), bytes) } - pub fn put_data_raw(&self, key: &KeyRef, value: &[u8]) -> Result<()> { - self.data_cf.put(key, value) + pub fn put_data_raw(&self, key: &Key, value: &[u8]) -> Result<()> { + self.data_cf.put_bytes(key, value) } pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - self.data_cf.put_by_slot_index(slot, index, bytes) + self.data_cf.put_by_index(&(slot, index), bytes) } pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result> { @@ -468,9 +452,9 @@ impl Blocktree { slot: u64, start_index: u64, end_index: u64, - key: &dyn Fn(u64, u64) -> Key, - slot_from_key: &dyn Fn(&KeyRef) -> Result, - index_from_key: &dyn Fn(&KeyRef) -> Result, + key: &dyn Fn(u64, u64) -> OwnedKey, + slot_from_key: &dyn Fn(&Key) -> u64, + index_from_key: &dyn Fn(&Key) -> u64, max_missing: usize, ) -> Vec { if start_index >= end_index || max_missing == 0 { @@ -495,14 +479,12 @@ impl Blocktree { break; } let current_key = db_iterator.key().expect("Expect a valid key"); - let current_slot = slot_from_key(¤t_key) - .expect("Expect to be able to parse slot from valid key"); + let current_slot = slot_from_key(¤t_key); let current_index = { if current_slot > slot { end_index } else { index_from_key(¤t_key) - .expect("Expect to be able to parse index from valid key") } }; let upper_index = cmp::min(current_index, end_index); @@ -543,9 +525,9 @@ impl Blocktree { slot, start_index, end_index, - &DataCf::key, - &DataCf::slot_from_key, - &DataCf::index_from_key, + &|slot, index| DataCf::key(&(slot, index)), + &MetaCf::index, + &|key| DataCf::index(key).1, max_missing, ) } @@ -564,9 +546,9 @@ impl Blocktree { slot, start_index, end_index, - &ErasureCf::key, - &ErasureCf::slot_from_key, - &ErasureCf::index_from_key, + &|slot, index| ErasureCf::key(&(slot, index)), + &MetaCf::index, + &|key| ErasureCf::index(key).1, max_missing, ) } @@ -661,11 +643,7 @@ impl Blocktree { // Write all the newly changed slots in new_chained_slots to the write_batch for (slot, meta_copy) in new_chained_slots.iter() { let meta: &SlotMeta = &RefCell::borrow(&*meta_copy); - write_batch.put_cf( - self.meta_cf.handle(), - &MetaCf::key(*slot), - &serialize(meta)?, - )?; + write_batch.put_cf(self.meta_cf.handle(), &MetaCf::key(slot), &serialize(meta)?)?; } Ok(()) } @@ -848,7 +826,7 @@ impl Blocktree { } }; - let key = DataCf::key(blob_slot, blob_index); + let key = DataCf::key(&(blob_slot, blob_index)); let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; // Commit step: commit all changes to the mutable structures at once, or none at all. @@ -892,7 +870,7 @@ impl Blocktree { // Try to find the next blob we're looking for in the prev_inserted_blob_datas if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) { blobs.push(Cow::Borrowed(*prev_blob_data)); - } else if let Some(blob_data) = self.data_cf.get_by_slot_index(slot, current_index)? { + } else if let Some(blob_data) = self.data_cf.get_by_index(&(slot, current_index))? { // Try to find the next blob we're looking for in the database blobs.push(Cow::Owned(blob_data)); } else { @@ -909,7 +887,7 @@ impl Blocktree { // don't count as ticks, even if they're empty entries fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> { // TODO: change bootstrap height to number of slots - let meta_key = MetaCf::key(0); + let meta_key = MetaCf::key(&0); let mut bootstrap_meta = SlotMeta::new(0, 1); let last = blobs.last().unwrap(); @@ -924,7 +902,7 @@ impl Blocktree { &serialize(&bootstrap_meta)?, )?; for blob in blobs { - let key = DataCf::key(blob.slot(), blob.index()); + let key = DataCf::key(&(blob.slot(), blob.index())); let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; } @@ -1171,7 +1149,7 @@ pub mod tests { // Test meta column family let meta = SlotMeta::new(0, 1); - let meta_key = MetaCf::key(0); + let meta_key = MetaCf::key(&0); ledger.meta_cf.put(&meta_key, &meta).unwrap(); let result = ledger .meta_cf @@ -1183,12 +1161,15 @@ pub mod tests { // Test erasure column family let erasure = vec![1u8; 16]; - let erasure_key = ErasureCf::key(0, 0); - ledger.erasure_cf.put(&erasure_key, &erasure).unwrap(); + let erasure_key = ErasureCf::key(&(0, 0)); + ledger + .erasure_cf + .put_bytes(&erasure_key[..], &erasure) + .unwrap(); let result = ledger .erasure_cf - .get(&erasure_key) + .get_bytes(&erasure_key[..]) .unwrap() .expect("Expected erasure object to exist"); @@ -1196,12 +1177,12 @@ pub mod tests { // Test data column family let data = vec![2u8; 16]; - let data_key = DataCf::key(0, 0); - ledger.data_cf.put(&data_key, &data).unwrap(); + let data_key = DataCf::key(&(0, 0)); + ledger.data_cf.put_bytes(&data_key, &data).unwrap(); let result = ledger .data_cf - .get(&data_key) + .get_bytes(&data_key) .unwrap() .expect("Expected data object to exist"); @@ -1296,7 +1277,7 @@ pub mod tests { let meta = ledger .meta_cf - .get(&MetaCf::key(0)) + .get(&MetaCf::key(&0)) .unwrap() .expect("Expected new metadata object to be created"); assert!(meta.consumed == 0 && meta.received == num_entries); @@ -1311,7 +1292,7 @@ pub mod tests { let meta = ledger .meta_cf - .get(&MetaCf::key(0)) + .get(&MetaCf::key(&0)) .unwrap() .expect("Expected new metadata object to exist"); assert_eq!(meta.consumed, num_entries); @@ -1341,7 +1322,7 @@ pub mod tests { let meta = ledger .meta_cf - .get(&MetaCf::key(0)) + .get(&MetaCf::key(&0)) .unwrap() .expect("Expected metadata object to exist"); assert_eq!(meta.parent_slot, 0); @@ -1392,14 +1373,13 @@ pub mod tests { .raw_iterator_cf(blocktree.data_cf.handle()) .expect("Expected to be able to open database iterator"); - db_iterator.seek(&DataCf::key(slot, 1)); + db_iterator.seek(&DataCf::key(&(slot, 1))); // Iterate through ledger for i in 0..num_entries { assert!(db_iterator.valid()); let current_key = db_iterator.key().expect("Expected a valid key"); - let current_index = DataCf::index_from_key(¤t_key) - .expect("Expect to be able to parse index from valid key"); + let current_index = DataCf::index(¤t_key).1; assert_eq!(current_index, (1 as u64) << (i * 8)); db_iterator.next(); } @@ -1519,7 +1499,7 @@ pub mod tests { assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); - let meta_key = MetaCf::key(slot); + let meta_key = MetaCf::key(&slot); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); if num_entries % 2 == 0 { assert_eq!(meta.received, num_entries); @@ -1541,7 +1521,7 @@ pub mod tests { original_entries, ); - let meta_key = MetaCf::key(slot); + let meta_key = MetaCf::key(&slot); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.received, num_entries); assert_eq!(meta.consumed, num_entries); @@ -1594,7 +1574,7 @@ pub mod tests { assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,); - let meta_key = MetaCf::key(0); + let meta_key = MetaCf::key(&0); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.consumed, num_unique_entries); assert_eq!(meta.received, num_unique_entries); @@ -2053,14 +2033,17 @@ pub mod tests { assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty()); let mut meta0 = SlotMeta::new(0, 0); - blocktree.meta_cf.put_slot_meta(0, &meta0).unwrap(); + blocktree + .meta_cf + .put_by_index(&0, &serialize(&meta0).unwrap()) + .unwrap(); // Slot exists, chains to nothing let expected: HashMap> = HashMap::from_iter(vec![(0, vec![])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected); meta0.next_slots = vec![1, 2]; - blocktree.meta_cf.put_slot_meta(0, &meta0).unwrap(); + blocktree.meta_cf.put(&MetaCf::key(&0), &meta0).unwrap(); // Slot exists, chains to some other slots let expected: HashMap> = @@ -2070,7 +2053,10 @@ pub mod tests { let mut meta3 = SlotMeta::new(3, 1); meta3.next_slots = vec![10, 5]; - blocktree.meta_cf.put_slot_meta(3, &meta3).unwrap(); + blocktree + .meta_cf + .put_by_index(&3, &serialize(&meta3).unwrap()) + .unwrap(); let expected: HashMap> = HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected); @@ -2119,7 +2105,7 @@ pub mod tests { entries[i as usize] ); - let meta_key = MetaCf::key(i); + let meta_key = MetaCf::key(&i); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.received, i + 1); assert_eq!(meta.last_index, i); diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 2cf315ac0e..a522f14140 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -11,8 +11,8 @@ use std::sync::Arc; pub trait Database: Sized + Send + Sync { type Error: Into; - type Key: Borrow; - type KeyRef: ?Sized; + type Key: ?Sized; + type OwnedKey: Borrow; type ColumnFamily; type Cursor: Cursor; type EntryIter: Iterator; @@ -20,11 +20,11 @@ pub trait Database: Sized + Send + Sync { fn cf_handle(&self, cf: &str) -> Option; - fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::KeyRef) -> Result>>; + fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result>>; - fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::KeyRef, data: &[u8]) -> Result<()>; + fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, data: &[u8]) -> Result<()>; - fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::KeyRef) -> Result<()>; + fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>; fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result; @@ -36,93 +36,25 @@ pub trait Database: Sized + Send + Sync { pub trait Cursor { fn valid(&self) -> bool; - fn seek(&mut self, key: &D::KeyRef); + fn seek(&mut self, key: &D::Key); fn seek_to_first(&mut self); fn next(&mut self); - fn key(&self) -> Option; + fn key(&self) -> Option; fn value(&self) -> Option>; } pub trait IWriteBatch { - fn put_cf(&mut self, cf: D::ColumnFamily, key: &D::KeyRef, data: &[u8]) -> Result<()>; + fn put_cf(&mut self, cf: D::ColumnFamily, key: &D::Key, data: &[u8]) -> Result<()>; } -pub trait IDataCf: LedgerColumnFamilyRaw { - fn new(db: Arc) -> Self; - - fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { - let key = Self::key(slot, index); - self.get(key.borrow()) - } - - fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { - let key = Self::key(slot, index); - self.delete(&key.borrow()) - } - - fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { - let key = Self::key(slot, index); - self.put(key.borrow(), serialized_value) - } - - fn key(slot: u64, index: u64) -> D::Key; - - fn slot_from_key(key: &D::KeyRef) -> Result; - - fn index_from_key(key: &D::KeyRef) -> Result; -} - -pub trait IErasureCf: LedgerColumnFamilyRaw { - fn new(db: Arc) -> Self; - - fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { - let key = Self::key(slot, index); - self.delete(key.borrow()) - } - - fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { - let key = Self::key(slot, index); - self.get(key.borrow()) - } - - fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { - let key = Self::key(slot, index); - self.put(key.borrow(), serialized_value) - } - - fn key(slot: u64, index: u64) -> D::Key; - - fn slot_from_key(key: &D::KeyRef) -> Result; - - fn index_from_key(key: &D::KeyRef) -> Result; -} - -pub trait IMetaCf: LedgerColumnFamily { - fn new(db: Arc) -> Self; - - fn key(slot: u64) -> D::Key; - - fn get_slot_meta(&self, slot: u64) -> Result> { - let key = Self::key(slot); - self.get(key.borrow()) - } - - fn put_slot_meta(&self, slot: u64, slot_meta: &super::SlotMeta) -> Result<()> { - let key = Self::key(slot); - self.put(key.borrow(), slot_meta) - } - - fn index_from_key(key: &D::KeyRef) -> Result; -} - -pub trait LedgerColumnFamily { +pub trait LedgerColumnFamily: LedgerColumnFamilyRaw { type ValueType: DeserializeOwned + Serialize; - fn get(&self, key: &D::KeyRef) -> Result> { + fn get(&self, key: &D::Key) -> Result> { let db = self.db(); let data_bytes = db.get_cf(self.handle(), key)?; @@ -134,52 +66,30 @@ pub trait LedgerColumnFamily { } } - fn get_bytes(&self, key: &D::KeyRef) -> Result>> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key)?; - Ok(data_bytes.map(|x| x.to_vec())) - } - - fn put_bytes(&self, key: &D::KeyRef, serialized_value: &[u8]) -> Result<()> { - let db = self.db(); - db.put_cf(self.handle(), key, &serialized_value)?; - Ok(()) - } - - fn put(&self, key: &D::KeyRef, value: &Self::ValueType) -> Result<()> { + fn put(&self, key: &D::Key, value: &Self::ValueType) -> Result<()> { let db = self.db(); let serialized = serialize(value)?; db.put_cf(self.handle(), key, &serialized)?; Ok(()) } - - fn delete(&self, key: &D::KeyRef) -> Result<()> { - let db = self.db(); - db.delete_cf(self.handle(), key)?; - Ok(()) - } - - fn db(&self) -> &Arc; - - fn handle(&self) -> D::ColumnFamily; } pub trait LedgerColumnFamilyRaw { - fn get(&self, key: &D::KeyRef) -> Result>> { + fn get_bytes(&self, key: &D::Key) -> Result>> { let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key)?; + let data_bytes = db.get_cf(self.handle(), key.borrow())?; Ok(data_bytes.map(|x| x.to_vec())) } - fn put(&self, key: &D::KeyRef, serialized_value: &[u8]) -> Result<()> { + fn put_bytes(&self, key: &D::Key, serialized_value: &[u8]) -> Result<()> { let db = self.db(); - db.put_cf(self.handle(), &key, &serialized_value)?; + db.put_cf(self.handle(), key.borrow(), &serialized_value)?; Ok(()) } - fn delete(&self, key: &D::KeyRef) -> Result<()> { + fn delete(&self, key: &D::Key) -> Result<()> { let db = self.db(); - db.delete_cf(self.handle(), &key)?; + db.delete_cf(self.handle(), key.borrow())?; Ok(()) } @@ -193,3 +103,27 @@ pub trait LedgerColumnFamilyRaw { fn db(&self) -> &Arc; } + +pub trait IndexColumn: LedgerColumnFamilyRaw { + type Index; + + fn get_by_index(&self, index: &Self::Index) -> Result>> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), Self::key(index).borrow())?; + Ok(data_bytes.map(|x| x.to_vec())) + } + + fn put_by_index(&self, index: &Self::Index, serialized_value: &[u8]) -> Result<()> { + let db = self.db(); + db.put_cf(self.handle(), Self::key(index).borrow(), &serialized_value)?; + Ok(()) + } + + fn delete_by_index(&self, index: &Self::Index) -> Result<()> { + self.delete(Self::key(index).borrow()) + } + + fn index(key: &D::Key) -> Self::Index; + + fn key(index: &Self::Index) -> D::OwnedKey; +} diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index 87f0d4e176..354e05cb6b 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -1,13 +1,12 @@ use crate::entry::Entry; use crate::packet::Blob; use crate::result::{Error, Result}; +use byteorder::{BigEndian, ByteOrder}; use solana_kvstore::{self as kvstore, Key, KvStore}; - use std::sync::Arc; use super::db::{ - Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily, - LedgerColumnFamilyRaw, + Cursor, Database, IWriteBatch, IndexColumn, LedgerColumnFamily, LedgerColumnFamilyRaw, }; use super::{Blocktree, BlocktreeError}; @@ -68,7 +67,7 @@ impl Blocktree { impl Database for Kvs { type Error = kvstore::Error; type Key = Key; - type KeyRef = Key; + type OwnedKey = Key; type ColumnFamily = ColumnFamily; type Cursor = KvsCursor; type EntryIter = EntryIterator; @@ -135,88 +134,6 @@ impl IWriteBatch for KvsWriteBatch { } } -impl IDataCf for DataCf { - fn new(db: Arc) -> Self { - DataCf { db } - } - - fn get_by_slot_index(&self, _slot: u64, _index: u64) -> Result>> { - unimplemented!() - } - - fn delete_by_slot_index(&self, _slot: u64, _index: u64) -> Result<()> { - unimplemented!() - } - - fn put_by_slot_index(&self, _slot: u64, _index: u64, _serialized_value: &[u8]) -> Result<()> { - unimplemented!() - } - - fn key(_slot: u64, _index: u64) -> Key { - unimplemented!() - } - - fn slot_from_key(_key: &Key) -> Result { - unimplemented!() - } - - fn index_from_key(_key: &Key) -> Result { - unimplemented!() - } -} - -impl IErasureCf for ErasureCf { - fn new(db: Arc) -> Self { - ErasureCf { db } - } - - fn delete_by_slot_index(&self, _slot: u64, _index: u64) -> Result<()> { - unimplemented!() - } - - fn get_by_slot_index(&self, _slot: u64, _index: u64) -> Result>> { - unimplemented!() - } - - fn put_by_slot_index(&self, _slot: u64, _index: u64, _serialized_value: &[u8]) -> Result<()> { - unimplemented!() - } - - fn key(slot: u64, index: u64) -> Key { - DataCf::key(slot, index) - } - - fn slot_from_key(key: &Key) -> Result { - DataCf::slot_from_key(key) - } - - fn index_from_key(key: &Key) -> Result { - DataCf::index_from_key(key) - } -} - -impl IMetaCf for MetaCf { - fn new(db: Arc) -> Self { - MetaCf { db } - } - - fn key(_slot: u64) -> Key { - unimplemented!() - } - - fn get_slot_meta(&self, _slot: u64) -> Result> { - unimplemented!() - } - - fn put_slot_meta(&self, _slot: u64, _slot_meta: &super::SlotMeta) -> Result<()> { - unimplemented!() - } - - fn index_from_key(_key: &Key) -> Result { - unimplemented!() - } -} - impl LedgerColumnFamilyRaw for DataCf { fn db(&self) -> &Arc { &self.db @@ -227,6 +144,20 @@ impl LedgerColumnFamilyRaw for DataCf { } } +impl IndexColumn for DataCf { + type Index = (u64, u64); + + fn index(key: &Key) -> (u64, u64) { + let slot = BigEndian::read_u64(&key.0[8..16]); + let index = BigEndian::read_u64(&key.0[16..24]); + (slot, index) + } + + fn key(idx: &(u64, u64)) -> Key { + Key::from((0, idx.0, idx.1)) + } +} + impl LedgerColumnFamilyRaw for ErasureCf { fn db(&self) -> &Arc { &self.db @@ -237,9 +168,19 @@ impl LedgerColumnFamilyRaw for ErasureCf { } } -impl LedgerColumnFamily for MetaCf { - type ValueType = super::SlotMeta; +impl IndexColumn for ErasureCf { + type Index = (u64, u64); + fn index(key: &Key) -> (u64, u64) { + DataCf::index(key) + } + + fn key(idx: &(u64, u64)) -> Key { + DataCf::key(idx) + } +} + +impl LedgerColumnFamilyRaw for MetaCf { fn db(&self) -> &Arc { &self.db } @@ -249,6 +190,24 @@ impl LedgerColumnFamily for MetaCf { } } +impl LedgerColumnFamily for MetaCf { + type ValueType = super::SlotMeta; +} + +impl IndexColumn for MetaCf { + type Index = u64; + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } + + fn key(slot: &u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], *slot); + key + } +} + impl std::convert::From for Error { fn from(e: kvstore::Error) -> Error { Error::BlocktreeError(BlocktreeError::KvsDb(e)) diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index dd594a44fd..1b9e3a2086 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -4,7 +4,7 @@ use crate::result::{Error, Result}; use bincode::deserialize; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; +use byteorder::{BigEndian, ByteOrder}; use rocksdb::{ self, ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, @@ -15,13 +15,11 @@ use solana_sdk::hash::Hash; use std::collections::VecDeque; use std::fs; -use std::io; use std::path::Path; use std::sync::Arc; use super::db::{ - Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily, - LedgerColumnFamilyRaw, + Cursor, Database, IWriteBatch, IndexColumn, LedgerColumnFamily, LedgerColumnFamilyRaw, }; use super::{Blocktree, BlocktreeError}; @@ -98,13 +96,13 @@ impl Blocktree { )?)); // Create the metadata column family - let meta_cf = MetaCf::new(db.clone()); + let meta_cf = MetaCf { db: db.clone() }; // Create the data column family - let data_cf = DataCf::new(db.clone()); + let data_cf = DataCf { db: db.clone() }; // Create the erasure column family - let erasure_cf = ErasureCf::new(db.clone()); + let erasure_cf = ErasureCf { db: db.clone() }; Ok(Blocktree { db, @@ -167,8 +165,8 @@ impl Blocktree { impl Database for Rocks { type Error = rocksdb::Error; - type Key = Vec; - type KeyRef = [u8]; + type Key = [u8]; + type OwnedKey = Vec; type ColumnFamily = ColumnFamily; type Cursor = DBRawIterator; type EntryIter = EntryIterator; @@ -238,106 +236,6 @@ impl IWriteBatch for RWriteBatch { } } -impl IDataCf for DataCf { - fn new(db: Arc) -> Self { - DataCf { db } - } - - fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { - let key = Self::key(slot, index); - self.get(&key) - } - - fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { - let key = Self::key(slot, index); - self.delete(&key) - } - - fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { - let key = Self::key(slot, index); - self.put(&key, serialized_value) - } - - fn key(slot: u64, index: u64) -> Vec { - let mut key = vec![0u8; 16]; - BigEndian::write_u64(&mut key[0..8], slot); - BigEndian::write_u64(&mut key[8..16], index); - key - } - - fn slot_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[0..8]); - let height = rdr.read_u64::()?; - Ok(height) - } - - fn index_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[8..16]); - let index = rdr.read_u64::()?; - Ok(index) - } -} - -impl IErasureCf for ErasureCf { - fn new(db: Arc) -> Self { - ErasureCf { db } - } - fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { - let key = Self::key(slot, index); - self.delete(&key) - } - - fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { - let key = Self::key(slot, index); - self.get(&key) - } - - fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { - let key = Self::key(slot, index); - self.put(&key, serialized_value) - } - - fn key(slot: u64, index: u64) -> Vec { - DataCf::key(slot, index) - } - - fn slot_from_key(key: &[u8]) -> Result { - DataCf::slot_from_key(key) - } - - fn index_from_key(key: &[u8]) -> Result { - DataCf::index_from_key(key) - } -} - -impl IMetaCf for MetaCf { - fn new(db: Arc) -> Self { - MetaCf { db } - } - - fn key(slot: u64) -> Vec { - let mut key = vec![0u8; 8]; - BigEndian::write_u64(&mut key[0..8], slot); - key - } - - fn get_slot_meta(&self, slot: u64) -> Result> { - let key = Self::key(slot); - self.get(&key) - } - - fn put_slot_meta(&self, slot: u64, slot_meta: &super::SlotMeta) -> Result<()> { - let key = Self::key(slot); - self.put(&key, slot_meta) - } - - fn index_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[..]); - let index = rdr.read_u64::()?; - Ok(index) - } -} - impl LedgerColumnFamilyRaw for DataCf { fn db(&self) -> &Arc { &self.db @@ -348,6 +246,23 @@ impl LedgerColumnFamilyRaw for DataCf { } } +impl IndexColumn for DataCf { + type Index = (u64, u64); + + fn index(key: &[u8]) -> (u64, u64) { + let slot = BigEndian::read_u64(&key[..8]); + let index = BigEndian::read_u64(&key[8..16]); + (slot, index) + } + + fn key(idx: &(u64, u64)) -> Vec { + let mut key = vec![0u8; 16]; + BigEndian::write_u64(&mut key[0..8], idx.0); + BigEndian::write_u64(&mut key[8..16], idx.1); + key + } +} + impl LedgerColumnFamilyRaw for ErasureCf { fn db(&self) -> &Arc { &self.db @@ -358,9 +273,19 @@ impl LedgerColumnFamilyRaw for ErasureCf { } } -impl LedgerColumnFamily for MetaCf { - type ValueType = super::SlotMeta; +impl IndexColumn for ErasureCf { + type Index = (u64, u64); + fn index(key: &[u8]) -> (u64, u64) { + DataCf::index(key) + } + + fn key(idx: &(u64, u64)) -> Vec { + DataCf::key(idx) + } +} + +impl LedgerColumnFamilyRaw for MetaCf { fn db(&self) -> &Arc { &self.db } @@ -370,6 +295,24 @@ impl LedgerColumnFamily for MetaCf { } } +impl LedgerColumnFamily for MetaCf { + type ValueType = super::SlotMeta; +} + +impl IndexColumn for MetaCf { + type Index = u64; + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } + + fn key(slot: &u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], *slot); + key + } +} + impl std::convert::From for Error { fn from(e: rocksdb::Error) -> Error { Error::BlocktreeError(BlocktreeError::RocksDb(e))