diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 8835b7a1d..5d0b13d4c 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -1,7 +1,10 @@ //! The `blocktree` module provides functions for parallel verification of the //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. -use crate::blocktree_db::{self, columns as cf, Column, IteratorDirection, IteratorMode}; +use crate::blocktree_db::{ + columns as cf, BatchProcessor, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, + WriteBatch, +}; pub use crate::blocktree_db::{BlocktreeError, Result}; pub use crate::blocktree_meta::SlotMeta; use crate::blocktree_meta::*; @@ -30,11 +33,6 @@ use std::rc::Rc; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; use std::sync::{Arc, RwLock}; -type Database = blocktree_db::Database; -type LedgerColumn = blocktree_db::LedgerColumn; -type WriteBatch = blocktree_db::WriteBatch; -type BatchProcessor = blocktree_db::BatchProcessor; - pub const BLOCKTREE_DIRECTORY: &str = "rocksdb"; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() @@ -239,7 +237,10 @@ impl Blocktree { self.orphans_cf.get(slot) } - pub fn slot_meta_iterator(&self, slot: u64) -> Result> { + pub fn slot_meta_iterator<'a>( + &'a self, + slot: u64, + ) -> Result + 'a> { let meta_iter = self .db .iter::(IteratorMode::From(slot, IteratorDirection::Forward))?; @@ -252,10 +253,10 @@ impl Blocktree { })) } - pub fn slot_data_iterator( - &self, + pub fn slot_data_iterator<'a>( + &'a self, slot: u64, - ) -> Result)>> { + ) -> Result)> + 'a> { let slot_iterator = self .db .iter::(IteratorMode::From((slot, 0), IteratorDirection::Forward))?; diff --git a/ledger/src/blocktree_db.rs b/ledger/src/blocktree_db.rs index e23b3da3e..f778cdec2 100644 --- a/ledger/src/blocktree_db.rs +++ b/ledger/src/blocktree_db.rs @@ -2,23 +2,20 @@ use crate::blocktree_meta; use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder}; use log::*; - +pub use rocksdb::Direction as IteratorDirection; +use rocksdb::{ + self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, + IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, +}; use serde::de::DeserializeOwned; use serde::Serialize; - use solana_sdk::clock::Slot; -use std::borrow::Borrow; use std::collections::HashMap; use std::fs; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; -use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, - IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, -}; - // A good value for this is the number of cores on the machine const TOTAL_THREADS: i32 = 8; const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB @@ -71,18 +68,18 @@ impl std::convert::From> for BlocktreeError } } +impl std::convert::From for BlocktreeError { + fn from(e: rocksdb::Error) -> BlocktreeError { + BlocktreeError::RocksDb(e) + } +} + pub enum IteratorMode { Start, End, From(Index, IteratorDirection), } -#[allow(dead_code)] -pub enum IteratorDirection { - Forward, - Reverse, -} - pub mod columns { #[derive(Debug)] /// SlotMeta Column @@ -205,26 +202,24 @@ impl Rocks { Ok(()) } - fn iterator_cf( + fn iterator_cf( &self, cf: ColumnFamily, - iterator_mode: IteratorMode<&[u8]>, - ) -> Result { - let iter = { - match iterator_mode { - IteratorMode::Start => self.0.iterator_cf(cf, RocksIteratorMode::Start)?, - IteratorMode::End => self.0.iterator_cf(cf, RocksIteratorMode::End)?, - IteratorMode::From(start_from, direction) => { - let rocks_direction = match direction { - IteratorDirection::Forward => Direction::Forward, - IteratorDirection::Reverse => Direction::Reverse, - }; - self.0 - .iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))? - } + iterator_mode: IteratorMode, + ) -> Result + where + C: Column, + { + let start_key; + let iterator_mode = match iterator_mode { + IteratorMode::From(start_from, direction) => { + start_key = C::key(start_from); + RocksIteratorMode::From(&start_key, direction) } + IteratorMode::Start => RocksIteratorMode::Start, + IteratorMode::End => RocksIteratorMode::End, }; - + let iter = self.0.iterator_cf(cf, iterator_mode)?; Ok(iter) } @@ -513,10 +508,7 @@ impl Database { where C: TypedColumn, { - if let Some(serialized_value) = self - .backend - .get_cf(self.cf_handle::(), C::key(key).borrow())? - { + if let Some(serialized_value) = self.backend.get_cf(self.cf_handle::(), &C::key(key))? { let value = deserialize(&serialized_value)?; Ok(Some(value)) @@ -525,31 +517,15 @@ impl Database { } } - pub fn iter( - &self, + pub fn iter<'a, C>( + &'a self, iterator_mode: IteratorMode, - ) -> Result)>> + ) -> Result)> + 'a> where C: Column, { - let iter = { - match iterator_mode { - IteratorMode::From(start_from, direction) => { - let key = C::key(start_from); - self.backend.iterator_cf( - self.cf_handle::(), - IteratorMode::From(key.borrow(), direction), - )? - } - IteratorMode::Start => self - .backend - .iterator_cf(self.cf_handle::(), IteratorMode::Start)?, - IteratorMode::End => self - .backend - .iterator_cf(self.cf_handle::(), IteratorMode::End)?, - } - }; - + let cf = self.cf_handle::(); + let iter = self.backend.iterator_cf::(cf, iterator_mode)?; Ok(iter.map(|(key, value)| (C::index(&key), value))) } @@ -615,27 +591,15 @@ where C: Column, { pub fn get_bytes(&self, key: C::Index) -> Result>> { - self.backend.get_cf(self.handle(), C::key(key).borrow()) + self.backend.get_cf(self.handle(), &C::key(key)) } - pub fn iter( - &self, + pub fn iter<'a>( + &'a self, iterator_mode: IteratorMode, - ) -> Result)>> { - let iter = { - match iterator_mode { - IteratorMode::From(start_from, direction) => { - let key = C::key(start_from); - self.backend - .iterator_cf(self.handle(), IteratorMode::From(key.borrow(), direction))? - } - IteratorMode::Start => self - .backend - .iterator_cf(self.handle(), IteratorMode::Start)?, - IteratorMode::End => self.backend.iterator_cf(self.handle(), IteratorMode::End)?, - } - }; - + ) -> Result)> + 'a> { + let cf = self.handle(); + let iter = self.backend.iterator_cf::(cf, iterator_mode)?; Ok(iter.map(|(key, value)| (C::index(&key), value))) } @@ -686,8 +650,7 @@ where } pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { - self.backend - .put_cf(self.handle(), C::key(key).borrow(), value) + self.backend.put_cf(self.handle(), &C::key(key), value) } } @@ -696,7 +659,7 @@ where C: TypedColumn, { pub fn get(&self, key: C::Index) -> Result> { - if let Some(serialized_value) = self.backend.get_cf(self.handle(), C::key(key).borrow())? { + if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? { let value = deserialize(&serialized_value)?; Ok(Some(value)) @@ -709,28 +672,28 @@ where let serialized_value = serialize(value)?; self.backend - .put_cf(self.handle(), C::key(key).borrow(), &serialized_value) + .put_cf(self.handle(), &C::key(key), &serialized_value) } } impl WriteBatch { pub fn put_bytes(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { self.write_batch - .put_cf(self.get_cf::(), C::key(key).borrow(), bytes) - .map_err(|e| e.into()) + .put_cf(self.get_cf::(), &C::key(key), bytes)?; + Ok(()) } pub fn delete(&mut self, key: C::Index) -> Result<()> { self.write_batch - .delete_cf(self.get_cf::(), C::key(key).borrow()) - .map_err(|e| e.into()) + .delete_cf(self.get_cf::(), &C::key(key))?; + Ok(()) } pub fn put(&mut self, key: C::Index, value: &C::Type) -> Result<()> { let serialized_value = serialize(&value)?; self.write_batch - .put_cf(self.get_cf::(), C::key(key).borrow(), &serialized_value) - .map_err(|e| e.into()) + .put_cf(self.get_cf::(), &C::key(key), &serialized_value)?; + Ok(()) } #[inline] @@ -739,12 +702,6 @@ impl WriteBatch { } } -impl std::convert::From for BlocktreeError { - fn from(e: rocksdb::Error) -> BlocktreeError { - BlocktreeError::RocksDb(e) - } -} - fn get_cf_options(name: &'static str) -> Options { use columns::{ErasureMeta, Index, ShredCode, ShredData};