Cleanup blocktree (#6508)

* Cut down on liberal use of borrow()

* No need to map_err(Into::into)

* Group From instances

* Remove Direction indirection

* Let rustfmt order imports

* Better copypasta

* Cleanup copypasta

* Add explicit lifetimes so that it doesn't get pegged to 'static when we upgrade rocksdb

* Remove redundant type aliases
This commit is contained in:
Greg Fitzgerald 2019-10-23 17:13:21 -06:00 committed by GitHub
parent f1172617cc
commit 955d0ab76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 99 deletions

View File

@ -1,7 +1,10 @@
//! The `blocktree` module provides functions for parallel verification of the //! The `blocktree` module provides functions for parallel verification of the
//! Proof of History ledger as well as iterative read, append write, and random //! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger. //! access read to a persistent file-based ledger.
use crate::blocktree_db::{self, columns as cf, Column, IteratorDirection, IteratorMode}; use crate::blocktree_db::{
columns as cf, BatchProcessor, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
WriteBatch,
};
pub use crate::blocktree_db::{BlocktreeError, Result}; pub use crate::blocktree_db::{BlocktreeError, Result};
pub use crate::blocktree_meta::SlotMeta; pub use crate::blocktree_meta::SlotMeta;
use crate::blocktree_meta::*; use crate::blocktree_meta::*;
@ -30,11 +33,6 @@ use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
type Database = blocktree_db::Database;
type LedgerColumn<C> = blocktree_db::LedgerColumn<C>;
type WriteBatch = blocktree_db::WriteBatch;
type BatchProcessor = blocktree_db::BatchProcessor;
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb"; pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
@ -239,7 +237,10 @@ impl Blocktree {
self.orphans_cf.get(slot) self.orphans_cf.get(slot)
} }
pub fn slot_meta_iterator(&self, slot: u64) -> Result<impl Iterator<Item = (u64, SlotMeta)>> { pub fn slot_meta_iterator<'a>(
&'a self,
slot: u64,
) -> Result<impl Iterator<Item = (u64, SlotMeta)> + 'a> {
let meta_iter = self let meta_iter = self
.db .db
.iter::<cf::SlotMeta>(IteratorMode::From(slot, IteratorDirection::Forward))?; .iter::<cf::SlotMeta>(IteratorMode::From(slot, IteratorDirection::Forward))?;
@ -252,10 +253,10 @@ impl Blocktree {
})) }))
} }
pub fn slot_data_iterator( pub fn slot_data_iterator<'a>(
&self, &'a self,
slot: u64, slot: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)>> { ) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + 'a> {
let slot_iterator = self let slot_iterator = self
.db .db
.iter::<cf::ShredData>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?; .iter::<cf::ShredData>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;

View File

@ -2,23 +2,20 @@ use crate::blocktree_meta;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use log::*; use log::*;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use std::borrow::Borrow;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
// A good value for this is the number of cores on the machine // A good value for this is the number of cores on the machine
const TOTAL_THREADS: i32 = 8; const TOTAL_THREADS: i32 = 8;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
@ -71,18 +68,18 @@ impl std::convert::From<std::boxed::Box<bincode::ErrorKind>> for BlocktreeError
} }
} }
impl std::convert::From<rocksdb::Error> for BlocktreeError {
fn from(e: rocksdb::Error) -> BlocktreeError {
BlocktreeError::RocksDb(e)
}
}
pub enum IteratorMode<Index> { pub enum IteratorMode<Index> {
Start, Start,
End, End,
From(Index, IteratorDirection), From(Index, IteratorDirection),
} }
#[allow(dead_code)]
pub enum IteratorDirection {
Forward,
Reverse,
}
pub mod columns { pub mod columns {
#[derive(Debug)] #[derive(Debug)]
/// SlotMeta Column /// SlotMeta Column
@ -205,26 +202,24 @@ impl Rocks {
Ok(()) Ok(())
} }
fn iterator_cf( fn iterator_cf<C>(
&self, &self,
cf: ColumnFamily, cf: ColumnFamily,
iterator_mode: IteratorMode<&[u8]>, iterator_mode: IteratorMode<C::Index>,
) -> Result<DBIterator> { ) -> Result<DBIterator>
let iter = { where
match iterator_mode { C: Column,
IteratorMode::Start => self.0.iterator_cf(cf, RocksIteratorMode::Start)?, {
IteratorMode::End => self.0.iterator_cf(cf, RocksIteratorMode::End)?, let start_key;
IteratorMode::From(start_from, direction) => { let iterator_mode = match iterator_mode {
let rocks_direction = match direction { IteratorMode::From(start_from, direction) => {
IteratorDirection::Forward => Direction::Forward, start_key = C::key(start_from);
IteratorDirection::Reverse => Direction::Reverse, RocksIteratorMode::From(&start_key, direction)
};
self.0
.iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))?
}
} }
IteratorMode::Start => RocksIteratorMode::Start,
IteratorMode::End => RocksIteratorMode::End,
}; };
let iter = self.0.iterator_cf(cf, iterator_mode)?;
Ok(iter) Ok(iter)
} }
@ -513,10 +508,7 @@ impl Database {
where where
C: TypedColumn, C: TypedColumn,
{ {
if let Some(serialized_value) = self if let Some(serialized_value) = self.backend.get_cf(self.cf_handle::<C>(), &C::key(key))? {
.backend
.get_cf(self.cf_handle::<C>(), C::key(key).borrow())?
{
let value = deserialize(&serialized_value)?; let value = deserialize(&serialized_value)?;
Ok(Some(value)) Ok(Some(value))
@ -525,31 +517,15 @@ impl Database {
} }
} }
pub fn iter<C>( pub fn iter<'a, C>(
&self, &'a self,
iterator_mode: IteratorMode<C::Index>, iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)>> ) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + 'a>
where where
C: Column, C: Column,
{ {
let iter = { let cf = self.cf_handle::<C>();
match iterator_mode { let iter = self.backend.iterator_cf::<C>(cf, iterator_mode)?;
IteratorMode::From(start_from, direction) => {
let key = C::key(start_from);
self.backend.iterator_cf(
self.cf_handle::<C>(),
IteratorMode::From(key.borrow(), direction),
)?
}
IteratorMode::Start => self
.backend
.iterator_cf(self.cf_handle::<C>(), IteratorMode::Start)?,
IteratorMode::End => self
.backend
.iterator_cf(self.cf_handle::<C>(), IteratorMode::End)?,
}
};
Ok(iter.map(|(key, value)| (C::index(&key), value))) Ok(iter.map(|(key, value)| (C::index(&key), value)))
} }
@ -615,27 +591,15 @@ where
C: Column, C: Column,
{ {
pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> { pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> {
self.backend.get_cf(self.handle(), C::key(key).borrow()) self.backend.get_cf(self.handle(), &C::key(key))
} }
pub fn iter( pub fn iter<'a>(
&self, &'a self,
iterator_mode: IteratorMode<C::Index>, iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)>> { ) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + 'a> {
let iter = { let cf = self.handle();
match iterator_mode { let iter = self.backend.iterator_cf::<C>(cf, iterator_mode)?;
IteratorMode::From(start_from, direction) => {
let key = C::key(start_from);
self.backend
.iterator_cf(self.handle(), IteratorMode::From(key.borrow(), direction))?
}
IteratorMode::Start => self
.backend
.iterator_cf(self.handle(), IteratorMode::Start)?,
IteratorMode::End => self.backend.iterator_cf(self.handle(), IteratorMode::End)?,
}
};
Ok(iter.map(|(key, value)| (C::index(&key), value))) Ok(iter.map(|(key, value)| (C::index(&key), value)))
} }
@ -686,8 +650,7 @@ where
} }
pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> {
self.backend self.backend.put_cf(self.handle(), &C::key(key), value)
.put_cf(self.handle(), C::key(key).borrow(), value)
} }
} }
@ -696,7 +659,7 @@ where
C: TypedColumn, C: TypedColumn,
{ {
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> { pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), C::key(key).borrow())? { if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
let value = deserialize(&serialized_value)?; let value = deserialize(&serialized_value)?;
Ok(Some(value)) Ok(Some(value))
@ -709,28 +672,28 @@ where
let serialized_value = serialize(value)?; let serialized_value = serialize(value)?;
self.backend self.backend
.put_cf(self.handle(), C::key(key).borrow(), &serialized_value) .put_cf(self.handle(), &C::key(key), &serialized_value)
} }
} }
impl WriteBatch { impl WriteBatch {
pub fn put_bytes<C: Column>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { pub fn put_bytes<C: Column>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch self.write_batch
.put_cf(self.get_cf::<C>(), C::key(key).borrow(), bytes) .put_cf(self.get_cf::<C>(), &C::key(key), bytes)?;
.map_err(|e| e.into()) Ok(())
} }
pub fn delete<C: Column>(&mut self, key: C::Index) -> Result<()> { pub fn delete<C: Column>(&mut self, key: C::Index) -> Result<()> {
self.write_batch self.write_batch
.delete_cf(self.get_cf::<C>(), C::key(key).borrow()) .delete_cf(self.get_cf::<C>(), &C::key(key))?;
.map_err(|e| e.into()) Ok(())
} }
pub fn put<C: TypedColumn>(&mut self, key: C::Index, value: &C::Type) -> Result<()> { pub fn put<C: TypedColumn>(&mut self, key: C::Index, value: &C::Type) -> Result<()> {
let serialized_value = serialize(&value)?; let serialized_value = serialize(&value)?;
self.write_batch self.write_batch
.put_cf(self.get_cf::<C>(), C::key(key).borrow(), &serialized_value) .put_cf(self.get_cf::<C>(), &C::key(key), &serialized_value)?;
.map_err(|e| e.into()) Ok(())
} }
#[inline] #[inline]
@ -739,12 +702,6 @@ impl WriteBatch {
} }
} }
impl std::convert::From<rocksdb::Error> for BlocktreeError {
fn from(e: rocksdb::Error) -> BlocktreeError {
BlocktreeError::RocksDb(e)
}
}
fn get_cf_options(name: &'static str) -> Options { fn get_cf_options(name: &'static str) -> Options {
use columns::{ErasureMeta, Index, ShredCode, ShredData}; use columns::{ErasureMeta, Index, ShredCode, ShredData};