Merge pull request #334 from ethcore/store-flush

Update kvdb to the proper flushing version from parity
This commit is contained in:
Svyatoslav Nikolsky 2016-12-15 01:30:55 +03:00 committed by GitHub
commit 3dfa1a9028
4 changed files with 125 additions and 120 deletions

11
Cargo.lock generated
View File

@ -177,7 +177,7 @@ dependencies = [
"bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chain 0.1.0", "chain 0.1.0",
"elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "elastic-array 0.6.0 (git+https://github.com/ethcore/elastic-array)",
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -214,8 +214,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "elastic-array" name = "elastic-array"
version = "0.5.0" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/ethcore/elastic-array#346f1ba5982576dab9d0b8fa178b50e1db0a21cd"
dependencies = [
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "env_logger" name = "env_logger"
@ -1231,7 +1234,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf" "checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
"checksum domain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "725459994103308a8476a95d8115280b1359dccc06ca14291df75f37459a9e30" "checksum domain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "725459994103308a8476a95d8115280b1359dccc06ca14291df75f37459a9e30"
"checksum dtoa 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0dd841b58510c9618291ffa448da2e4e0f699d984d436122372f446dae62263d" "checksum dtoa 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0dd841b58510c9618291ffa448da2e4e0f699d984d436122372f446dae62263d"
"checksum elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4bc9250a632e7c001b741eb0ec6cee93c9a5b6d5f1879696a4b94d62b012210a" "checksum elastic-array 0.6.0 (git+https://github.com/ethcore/elastic-array)" = "<none>"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "<none>" "checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "<none>"
"checksum futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0bad0a2ac64b227fdc10c254051ae5af542cf19c9328704fd4092f7914196897" "checksum futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0bad0a2ac64b227fdc10c254051ae5af542cf19c9328704fd4092f7914196897"

View File

@ -4,7 +4,7 @@ version = "0.1.0"
authors = ["Nikolay Volf <nikvolf@gmail.com>"] authors = ["Nikolay Volf <nikvolf@gmail.com>"]
[dependencies] [dependencies]
elastic-array = "0.5" elastic-array = { git = "https://github.com/ethcore/elastic-array" }
rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" } rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" }
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }
primitives = { path = "../primitives" } primitives = { path = "../primitives" }

View File

@ -1,39 +1,22 @@
//! Key-Value store abstraction with `RocksDB` backend. //! Key-Value store abstraction with `RocksDB` backend.
use std::{self, fs, mem}; use std::mem;
use std::io::ErrorKind; use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::HashMap; use elastic_array::*;
use std::path::PathBuf; use std::default::Default;
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
Options, DBCompactionStyle, BlockBasedOptions, Cache, Column}; Options, DBCompactionStyle, BlockBasedOptions, Cache, Column, ReadOptions};
use elastic_array::ElasticArray32; use std::collections::HashMap;
use parking_lot::RwLock;
use primitives::bytes::Bytes;
use byteorder::{LittleEndian, ByteOrder}; use byteorder::{LittleEndian, ByteOrder};
//use std::path::Path;
/// Database error
pub enum Error {
/// Rocksdb error
DB(String),
/// Io error
Io(std::io::Error),
}
impl From<String> for Error {
fn from(err: String) -> Error {
Error::DB(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
Error::Io(err)
}
}
const DB_BACKGROUND_FLUSHES: i32 = 2; const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2; const DB_BACKGROUND_COMPACTIONS: i32 = 2;
type Bytes = Vec<u8>;
pub type DBValue = ElasticArray128<u8>;
/// Write transaction. Batches a sequence of put/delete operations for efficiency. /// Write transaction. Batches a sequence of put/delete operations for efficiency.
pub struct DBTransaction { pub struct DBTransaction {
ops: Vec<DBOp>, ops: Vec<DBOp>,
@ -44,7 +27,7 @@ enum DBOp {
Insert { Insert {
col: Option<u32>, col: Option<u32>,
key: ElasticArray32<u8>, key: ElasticArray32<u8>,
value: Bytes, value: DBValue,
}, },
Delete { Delete {
col: Option<u32>, col: Option<u32>,
@ -68,7 +51,7 @@ impl DBTransaction {
self.ops.push(DBOp::Insert { self.ops.push(DBOp::Insert {
col: col, col: col,
key: ekey, key: ekey,
value: value.to_vec().into(), value: DBValue::from_slice(value),
}); });
} }
@ -79,7 +62,7 @@ impl DBTransaction {
self.ops.push(DBOp::Insert { self.ops.push(DBOp::Insert {
col: col, col: col,
key: ekey, key: ekey,
value: value, value: DBValue::from_vec(value),
}); });
} }
@ -93,7 +76,6 @@ impl DBTransaction {
}); });
} }
/// Write u64
pub fn write_u64(&mut self, col: Option<u32>, key: &[u8], value: u64) { pub fn write_u64(&mut self, col: Option<u32>, key: &[u8], value: u64) {
let mut val = [0u8; 8]; let mut val = [0u8; 8];
LittleEndian::write_u64(&mut val, value); LittleEndian::write_u64(&mut val, value);
@ -119,12 +101,12 @@ impl DBTransaction {
} }
enum KeyState { enum KeyState {
Insert(Bytes), Insert(DBValue),
Delete, Delete,
} }
/// Compaction profile for the database settings /// Compaction profile for the database settings
#[derive(Clone, Copy)] #[derive(Clone, Copy, PartialEq, Debug)]
pub struct CompactionProfile { pub struct CompactionProfile {
/// L0-L1 target file size /// L0-L1 target file size
pub initial_file_size: u64, pub initial_file_size: u64,
@ -137,16 +119,22 @@ pub struct CompactionProfile {
impl Default for CompactionProfile { impl Default for CompactionProfile {
/// Default profile suitable for most storage /// Default profile suitable for most storage
fn default() -> CompactionProfile { fn default() -> CompactionProfile {
CompactionProfile::ssd()
}
}
impl CompactionProfile {
/// Default profile suitable for SSD storage
pub fn ssd() -> CompactionProfile {
CompactionProfile { CompactionProfile {
initial_file_size: 32 * 1024 * 1024, initial_file_size: 32 * 1024 * 1024,
file_size_multiplier: 2, file_size_multiplier: 2,
write_rate_limit: None, write_rate_limit: None,
} }
} }
}
impl CompactionProfile { /// Slow HDD compaction profile
/// Slow hdd compaction profile
pub fn hdd() -> CompactionProfile { pub fn hdd() -> CompactionProfile {
CompactionProfile { CompactionProfile {
initial_file_size: 192 * 1024 * 1024, initial_file_size: 192 * 1024 * 1024,
@ -219,10 +207,15 @@ struct DBAndColumns {
/// Key-Value database. /// Key-Value database.
pub struct Database { pub struct Database {
db: RwLock<Option<DBAndColumns>>, db: RwLock<Option<DBAndColumns>>,
config: DatabaseConfig,
write_opts: WriteOptions, write_opts: WriteOptions,
read_opts: ReadOptions,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>, overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
path: String, // Values currently being flushed. Cleared when `flush` completes.
flushing: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
// Prevents concurrent flushes.
// Value indicates if a flush is in progress.
flushing_lock: Mutex<bool>,
} }
impl Database { impl Database {
@ -241,6 +234,7 @@ impl Database {
try!(opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))); try!(opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)));
} }
try!(opts.set_parsed_options(&format!("max_total_wal_size={}", 64 * 1024 * 1024))); try!(opts.set_parsed_options(&format!("max_total_wal_size={}", 64 * 1024 * 1024)));
try!(opts.set_parsed_options("verify_checksums_in_compaction=0"));
opts.set_max_open_files(config.max_open_files); opts.set_max_open_files(config.max_open_files);
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_use_fsync(false); opts.set_use_fsync(false);
@ -254,6 +248,8 @@ impl Database {
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize); let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize);
let cfnames: Vec<_> = (0..config.columns.unwrap_or(0)).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
for col in 0 .. config.columns.unwrap_or(0) { for col in 0 .. config.columns.unwrap_or(0) {
let mut opts = Options::new(); let mut opts = Options::new();
@ -278,15 +274,16 @@ impl Database {
if !config.wal { if !config.wal {
write_opts.disable_wal(true); write_opts.disable_wal(true);
} }
let mut read_opts = ReadOptions::new();
read_opts.set_verify_checksums(false);
let mut cfs: Vec<Column> = Vec::new(); let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns { let db = match config.columns {
Some(columns) => { Some(columns) => {
let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
match DB::open_cf(&opts, path, &cfnames, &cf_options) { match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => { Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n).unwrap()).collect(); cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
assert!(cfs.len() == columns as usize); assert!(cfs.len() == columns as usize);
Ok(db) Ok(db)
} }
@ -294,31 +291,39 @@ impl Database {
// retry and create CFs // retry and create CFs
match DB::open_cf(&opts, path, &[], &[]) { match DB::open_cf(&opts, path, &[], &[]) {
Ok(mut db) => { Ok(mut db) => {
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i]).unwrap()).collect(); cfs = try!(cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect());
Ok(db) Ok(db)
}, },
err => err, err @ Err(_) => err,
} }
} }
} }
}, },
None => DB::open(&opts, path) None => DB::open(&opts, path)
}; };
let db = match db { let db = match db {
Ok(db) => db, Ok(db) => db,
Err(ref s) if s.starts_with("Corruption:") => { Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
info!("Attempting DB repair for {}", path);
try!(DB::repair(&opts, path)); try!(DB::repair(&opts, path));
try!(DB::open(&opts, path))
match cfnames.is_empty() {
true => try!(DB::open(&opts, path)),
false => try!(DB::open_cf(&opts, path, &cfnames, &cf_options))
}
}, },
Err(s) => { return Err(s); } Err(s) => { return Err(s); }
}; };
let num_cols = cfs.len(); let num_cols = cfs.len();
Ok(Database { Ok(Database {
db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })), db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })),
config: config.clone(),
write_opts: write_opts, write_opts: write_opts,
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
path: path.to_owned(), flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new((false)),
read_opts: read_opts,
}) })
} }
@ -350,40 +355,60 @@ impl Database {
}; };
} }
/// Commit buffered changes to database. /// Commit buffered changes to database. Must be called under `flush_lock`
pub fn flush(&self) -> Result<(), String> { fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> Result<(), String> {
match *self.db.read() { match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => { Some(DBAndColumns { ref db, ref cfs }) => {
let batch = WriteBatch::new(); let batch = WriteBatch::new();
let mut overlay = self.overlay.write(); mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in overlay.iter_mut().enumerate() { for (c, column) in self.flushing.read().iter().enumerate() {
let column_data = mem::replace(column, HashMap::new()); for (ref key, ref state) in column.iter() {
for (key, state) in column_data { match **state {
match state { KeyState::Delete => {
KeyState::Delete => { if c > 0 {
if c > 0 { try!(batch.delete_cf(cfs[c - 1], &key));
try!(batch.delete_cf(cfs[c - 1], &key)); } else {
} else { try!(batch.delete(&key));
try!(batch.delete(&key)); }
} },
}, KeyState::Insert(ref value) => {
KeyState::Insert(value) => { if c > 0 {
if c > 0 { try!(batch.put_cf(cfs[c - 1], &key, value));
try!(batch.put_cf(cfs[c - 1], &key, &value)); } else {
} else { try!(batch.put(&key, &value));
try!(batch.put(&key, &value)); }
} },
}, }
} }
} }
} }
db.write_opt(batch, &self.write_opts) try!(db.write_opt(batch, &self.write_opts));
for column in self.flushing.write().iter_mut() {
column.clear();
column.shrink_to_fit();
}
Ok(())
}, },
None => Err("Database is closed".to_owned()) None => Err("Database is closed".to_owned())
} }
} }
/// Commit buffered changes to database.
pub fn flush(&self) -> Result<(), String> {
let mut lock = self.flushing_lock.lock();
// If RocksDB batch allocation fails the thread gets terminated and the lock is released.
// The value inside the lock is used to detect that.
if *lock {
// This can only happen if another flushing thread is terminated unexpectedly.
return Err("Database write failure. Running low on memory perhaps?".to_owned());
}
*lock = true;
let result = self.write_flushing_with_lock(&mut lock);
*lock = false;
result
}
/// Commit transaction to database. /// Commit transaction to database.
pub fn write(&self, tr: DBTransaction) -> Result<(), String> { pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
match *self.db.read() { match *self.db.read() {
@ -407,7 +432,7 @@ impl Database {
} }
/// Get value by key. /// Get value by key.
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> { pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
match *self.db.read() { match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => { Some(DBAndColumns { ref db, ref cfs }) => {
let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
@ -415,9 +440,16 @@ impl Database {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None), Some(&KeyState::Delete) => Ok(None),
None => { None => {
col.map_or_else( let flushing = &self.flushing.read()[Self::to_overlay_column(col)];
|| db.get(key).map(|r| r.map(|v| v.to_vec().into())), match flushing.get(key) {
|c| db.get_cf(cfs[c as usize], key).map(|r| r.map(|v| v.to_vec().into()))) Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
col.map_or_else(
|| db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))),
|c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))))
},
}
}, },
} }
}, },
@ -430,57 +462,26 @@ impl Database {
//TODO: iterate over overlay //TODO: iterate over overlay
match *self.db.read() { match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => { Some(DBAndColumns { ref db, ref cfs }) => {
col.map_or_else(|| DatabaseIterator { iter: db.iterator(IteratorMode::Start) }, col.map_or_else(|| DatabaseIterator { iter: db.iterator_opt(IteratorMode::Start, &self.read_opts) },
|c| DatabaseIterator { iter: db.iterator_cf(cfs[c as usize], IteratorMode::Start).unwrap() }) |c| DatabaseIterator { iter: db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts)
.expect("iterator params are valid; qed") })
}, },
None => panic!("Not supported yet") //TODO: return an empty iterator or change return type None => panic!("Not supported yet") //TODO: return an empty iterator or change return type
} }
} }
/// Close the database /// Close the database
fn close(&self) { pub fn close(&self) {
*self.db.write() = None; *self.db.write() = None;
self.overlay.write().clear(); self.overlay.write().clear();
self.flushing.write().clear();
} }
}
/// Restore the database from a copy at given path. impl Drop for Database {
pub fn restore(&self, new_db: &str) -> Result<(), Error> { fn drop(&mut self) {
self.close(); // write all buffered changes if we can.
let _ = self.flush();
let mut backup_db = PathBuf::from(&self.path);
backup_db.pop();
backup_db.push("backup_db");
let existed = match fs::rename(&self.path, &backup_db) {
Ok(_) => true,
Err(e) => if let ErrorKind::NotFound = e.kind() {
false
} else {
return Err(e.into());
}
};
match fs::rename(&new_db, &self.path) {
Ok(_) => {
// clean up the backup.
if existed {
try!(fs::remove_dir_all(&backup_db));
}
}
Err(e) => {
// restore the backup.
if existed {
try!(fs::rename(&backup_db, &self.path));
}
return Err(e.into())
}
}
// reopen the database and steal handles into self
let db = try!(Self::open(&self.config, &self.path));
*self.db.write() = mem::replace(&mut *db.db.write(), None);
*self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new());
Ok(())
} }
} }

View File

@ -180,6 +180,7 @@ impl Storage {
/// get the value of the key in the database /// get the value of the key in the database
fn get(&self, col: u32, key: &[u8]) -> Option<Bytes> { fn get(&self, col: u32, key: &[u8]) -> Option<Bytes> {
self.database.get(Some(col), key).expect("fatal db error") self.database.get(Some(col), key).expect("fatal db error")
.map(|val| val.to_vec().into())
} }
/// resolves hash for the block reference (which can be referenced by number or /// resolves hash for the block reference (which can be referenced by number or