update to upstream rocksdb

This commit is contained in:
Andronik Ordian 2019-04-11 11:53:29 +02:00
parent 05449fdb89
commit 90b47c1594
No known key found for this signature in database
GPG Key ID: C66F3C680DE0E6ED
2 changed files with 98 additions and 70 deletions

View File

@ -15,7 +15,7 @@ log = "0.4"
num_cpus = "1.0"
parking_lot = "0.7"
regex = "1.0"
parity-rocksdb = "0.5"
rocksdb = "0.12.1"
[dev-dependencies]
tempdir = "0.3"

View File

@ -20,7 +20,7 @@ extern crate log;
extern crate fs_swap;
extern crate num_cpus;
extern crate regex;
extern crate parity_rocksdb;
extern crate rocksdb;
#[cfg(test)]
extern crate ethereum_types;
@ -30,9 +30,9 @@ extern crate kvdb;
use std::{cmp, fs, io, result, error};
use std::path::Path;
use parity_rocksdb::{
DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
Options, BlockBasedOptions, Direction, Cache, Column, ReadOptions
use rocksdb::{
DB, WriteBatch, WriteOptions, IteratorMode, DBIterator, Options, Error,
BlockBasedOptions, Direction, ColumnFamily, ColumnFamilyDescriptor, ReadOptions,
};
use kvdb::{
@ -57,6 +57,7 @@ fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<error::Error + Send + Sy
const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128;
const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
const CF_HANDLE_PROOF: &'static str = "rocksdb opens a cf_handle for each cfname; qed";
/// Compaction profile for the database settings
@ -195,30 +196,35 @@ impl Default for DatabaseConfig {
pub struct DBAndColumns {
db: DB,
cfs: Vec<Column>,
cf_names: Vec<String>,
path: String,
write_opts: WriteOptions,
read_opts: ReadOptions,
block_opts: BlockBasedOptions,
}
unsafe impl Send for DBAndColumns {}
unsafe impl Sync for DBAndColumns {}
// get column family configuration from database config.
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result<Options> {
let mut opts = Options::new();
let mut opts = Options::default();
opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;
// TODO: add to upstream
// opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;
opts.set_block_based_table_factory(block_opts);
opts.set_parsed_options(
&format!("block_based_table_factory={{{};{}}}",
"cache_index_and_filter_blocks=true",
"pin_l0_filter_and_index_blocks_in_cache=true")).map_err(other_io_err)?;
// TODO: add to upstream (pin_l0_filter_and_index_blocks_in_cache)
// opts.set_parsed_options(
// &format!("block_based_table_factory={{{};{}}}",
// "cache_index_and_filter_blocks=true",
// "pin_l0_filter_and_index_blocks_in_cache=true")).map_err(other_io_err)?;
opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
opts.optimize_level_style_compaction(config.memory_budget_per_col());
opts.set_target_file_size_base(config.compaction.initial_file_size);
opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?;
opts.set_compression_per_level(&[]);
Ok(opts)
}
@ -226,72 +232,80 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Re
/// Key-Value database.
pub type Database = DatabaseWithCache<DBAndColumns>;
impl OpenHandler<DBAndColumns> for DBAndColumns {
type Config = DatabaseConfig;
// TODO: fix upstream to take options as a ref for DB::repair
impl DBAndColumns {
fn open(config: &Self::Config, path: &str) -> io::Result<DBAndColumns> {
let mut opts = Options::new();
fn options(config: &DatabaseConfig) -> Options {
let mut opts = Options::default();
if let Some(rate_limit) = config.compaction.write_rate_limit {
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?;
if let Some(_rate_limit) = config.compaction.write_rate_limit {
// TODO: add to upstream
// opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?;
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?;
opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?;
opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2);
opts.set_keep_log_file_num(1);
opts.set_bytes_per_sync(1048576);
opts.set_write_buffer_size(config.memory_budget_per_col() / 2);
opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2));
let mut block_opts = BlockBasedOptions::new();
opts
}
}
impl OpenHandler<DBAndColumns> for DBAndColumns {
type Config = DatabaseConfig;
fn open(config: &Self::Config, path: &str) -> io::Result<DBAndColumns> {
let opts = Self::options(config);
let mut block_opts = BlockBasedOptions::default();
{
block_opts.set_block_size(config.compaction.block_size);
let cache_size = cmp::max(8, config.memory_budget() / 3);
let cache = Cache::new(cache_size);
block_opts.set_cache(cache);
let cache_size = cmp::max(8 * 1024 * 1024, config.memory_budget() / 3);
block_opts.set_lru_cache(cache_size);
}
// attempt database repair if it has been previously marked as corrupted
let db_corrupted = Path::new(path).join(CORRUPTION_FILE_NAME);
if db_corrupted.exists() {
warn!("DB has been previously marked as corrupted, attempting repair");
DB::repair(&opts, path).map_err(other_io_err)?;
let opts2 = Self::options(config);
DB::repair(opts2, path).map_err(other_io_err)?;
fs::remove_file(db_corrupted)?;
}
let columns = config.columns.unwrap_or(0) as usize;
let mut cf_descriptors = Vec::with_capacity(columns);
let mut cf_options = Vec::with_capacity(columns);
let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
let cf_names: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cf_names.iter().map(|n| n as &str).collect();
for _ in 0 .. config.columns.unwrap_or(0) {
for name in &cf_names {
cf_descriptors.push(ColumnFamilyDescriptor::new(name.clone(), col_config(&config, &block_opts)?));
// TODO: avoid calling col_config twice (fix upstream)
cf_options.push(col_config(&config, &block_opts)?);
}
let write_opts = WriteOptions::new();
let mut read_opts = ReadOptions::new();
read_opts.set_verify_checksums(false);
let write_opts = WriteOptions::default();
let read_opts = ReadOptions::default();
// TODO: add to upstream
// read_opts.set_verify_checksums(false);
let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(_) => {
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
Ok(db)
}
match DB::open_cf_descriptors(&opts, path, cf_descriptors) {
db @ Ok(_) => db,
Err(_) => {
// retry and create CFs
match DB::open_cf(&opts, path, &[], &[]) {
let names: &[&str] = &[];
match DB::open_cf(&opts, path, names) {
Ok(mut db) => {
cfs = cfnames.iter()
.enumerate()
.map(|(i, n)| db.create_cf(n, &cf_options[i]))
.collect::<::std::result::Result<_, _>>()
.map_err(other_io_err)?;
for (i, n)in cfnames.iter().enumerate() {
let _ = db.create_cf(n, &cf_options[i]).map_err(other_io_err)?;
}
Ok(db)
},
err => err,
@ -304,16 +318,20 @@ impl OpenHandler<DBAndColumns> for DBAndColumns {
let db = match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
Err(ref s) if is_corrupted(&s.clone().into_string()) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;
let opts2 = Self::options(config);
DB::repair(opts2, path).map_err(other_io_err)?;
match cfnames.is_empty() {
true => DB::open(&opts, path).map_err(other_io_err)?,
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
// TODO: fix upstream to take cf_descriptors as refs
let cf_descriptors: Vec<_> = cfnames.iter()
.zip(cf_options)
.map(|(name, option)| ColumnFamilyDescriptor::new(name.clone(), option))
.collect();
let db = DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)?;
db
},
}
@ -325,7 +343,8 @@ impl OpenHandler<DBAndColumns> for DBAndColumns {
Ok(DBAndColumns{
db,
cfs,
// TODO: avoid clone
cf_names: cf_names.clone(),
path: path.to_owned(),
write_opts,
read_opts,
@ -345,7 +364,7 @@ struct RocksDBWriteTransaction<'a> {
path: &'a str,
db: &'a DB,
write_opts: &'a WriteOptions,
cfs: &'a [Column],
cfs: Vec<ColumnFamily<'a>>,
}
impl<'a> WriteTransaction for RocksDBWriteTransaction<'a> {
@ -377,7 +396,7 @@ impl<'a> WriteTransaction for RocksDBWriteTransaction<'a> {
struct RocksDBReadTransaction<'a> {
db: &'a DB,
read_opts: &'a ReadOptions,
cfs: &'a [Column],
cfs: Vec<ColumnFamily<'a>>,
}
impl<'a> ReadTransaction for RocksDBReadTransaction<'a> {
@ -392,14 +411,20 @@ impl<'a> ReadTransaction for RocksDBReadTransaction<'a> {
}
}
impl DBAndColumns {
fn column_families(&self) -> Vec<ColumnFamily> {
self.cf_names.iter().map(|name| self.db.cf_handle(&name).expect(CF_HANDLE_PROOF)).collect()
}
}
impl TransactionHandler for DBAndColumns {
fn write_transaction<'a>(&'a self) -> Box<WriteTransaction + 'a> {
Box::new(RocksDBWriteTransaction {
batch: WriteBatch::new(),
batch: WriteBatch::default(),
db: &self.db,
path: &self.path,
write_opts: &self.write_opts,
cfs: &self.cfs[..],
cfs: self.column_families(),
})
}
@ -407,20 +432,21 @@ impl TransactionHandler for DBAndColumns {
Box::new(RocksDBReadTransaction {
db: &self.db,
read_opts: &self.read_opts,
cfs: &self.cfs[..],
cfs: self.column_families(),
})
}
}
impl<'a> IterationHandler for &'a DBAndColumns {
type Iterator = DBIterator;
type Iterator = DBIterator<'a>;
fn iter(&self, c: usize) -> Self::Iterator {
if c > 0 {
let cfs = self.column_families();
self.db.iterator_cf_opt(
self.cfs[c - 1],
IteratorMode::Start,
cfs[c - 1],
&self.read_opts,
IteratorMode::Start,
).expect("iterator params are valid; qed")
} else {
self.db.iterator_opt(IteratorMode::Start, &self.read_opts)
@ -429,10 +455,11 @@ impl<'a> IterationHandler for &'a DBAndColumns {
fn iter_from_prefix(&self, c: usize, prefix: & [u8]) -> Self::Iterator {
if c > 0 {
let cfs = self.column_families();
self.db.iterator_cf_opt(
self.cfs[c - 1],
IteratorMode::From(prefix, Direction::Forward),
cfs[c - 1],
&self.read_opts,
IteratorMode::From(prefix, Direction::Forward),
).expect("iterator params are valid; qed")
} else {
self.db.iterator_opt(
@ -444,9 +471,9 @@ impl<'a> IterationHandler for &'a DBAndColumns {
}
#[inline]
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> io::Result<T> {
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, Error>) -> io::Result<T> {
if let Err(ref s) = res {
if s.starts_with("Corruption:") {
if s.clone().into_string().starts_with("Corruption:") {
warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
let _ = fs::File::create(path.as_ref().join(CORRUPTION_FILE_NAME));
}
@ -461,14 +488,14 @@ fn is_corrupted(s: &str) -> bool {
impl NumColumns for DBAndColumns {
fn num_columns(&self) -> usize {
if self.cfs.is_empty() { 0 } else { self.cfs.len() }
if self.cf_names.is_empty() { 0 } else { self.cf_names.len() }
}
}
impl MigrationHandler<DBAndColumns> for DBAndColumns {
fn drop_column(&mut self) -> io::Result<()> {
if let Some(col) = self.cfs.pop() {
let name = format!("col{}", self.cfs.len());
if let Some(col) = self.cf_names.pop() {
let name = format!("col{}", self.cf_names.len());
drop(col);
self.db.drop_cf(&name).map_err(other_io_err)?;
}
@ -476,9 +503,10 @@ impl MigrationHandler<DBAndColumns> for DBAndColumns {
}
fn add_column(&mut self, config: &<DBAndColumns as OpenHandler<DBAndColumns>>::Config) -> io::Result<()> {
let col = self.cfs.len() as u32;
let col = self.cf_names.len() as u32;
let name = format!("col{}", col);
self.cfs.push(self.db.create_cf(&name, &col_config(config, &self.block_opts)?).map_err(other_io_err)?);
let _ = self.db.create_cf(&name, &col_config(config, &self.block_opts)?).map_err(other_io_err)?;
self.cf_names.push(name);
Ok(())
}
}