Merge pull request #43 from ethcore/db-up

Database upgrade
This commit is contained in:
Svyatoslav Nikolsky 2016-10-27 10:42:28 +00:00 committed by GitHub
commit 9fd98e9392
8 changed files with 231 additions and 39 deletions

7
Cargo.lock generated
View File

@ -51,6 +51,11 @@ name = "base58"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bit-vec"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bitcrypto"
version = "0.1.0"
@ -115,6 +120,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "db"
version = "0.1.0"
dependencies = [
"bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chain 0.1.0",
"elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -705,6 +711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6"
"checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096"
"checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83"
"checksum bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5b97c2c8e8bbb4251754f559df8af22fb264853c7d009084a576cdf12565089d"
"checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3"
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"

View File

@ -13,6 +13,7 @@ chain = { path = "../chain" }
serialization = { path = "../serialization" }
parking_lot = "0.3"
test-data = { path = "../test-data" }
bit-vec = "0.4"
[features]
dev = []

View File

@ -9,6 +9,7 @@ use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
use elastic_array::ElasticArray32;
use parking_lot::RwLock;
use primitives::bytes::Bytes;
use byteorder::{LittleEndian, ByteOrder};
/// Database error
pub enum Error {
@ -89,6 +90,20 @@ impl DBTransaction {
key: ekey,
});
}
/// Write u64
pub fn write_u64(&mut self, col: Option<u32>, key: &[u8], value: u64) {
let mut val = [0u8; 8];
LittleEndian::write_u64(&mut val, value);
self.put(col, key, &val);
}
/// Write u32
pub fn write_u32(&mut self, col: Option<u32>, key: &[u8], value: u32) {
let mut val = [0u8; 4];
LittleEndian::write_u32(&mut val, value);
self.put(col, key, &val);
}
}
enum KeyState {

View File

@ -7,6 +7,7 @@ extern crate primitives;
extern crate byteorder;
extern crate chain;
extern crate serialization;
extern crate bit_vec;
#[cfg(test)]
extern crate ethcore_devtools as devtools;
@ -17,9 +18,10 @@ mod kvdb;
mod storage;
#[cfg(feature="dev")]
mod test_storage;
mod transaction_meta;
pub enum BlockRef {
Number(u64),
Number(u32),
Hash(primitives::hash::H256),
}

View File

@ -2,13 +2,15 @@
use std::{self, fs};
use std::path::Path;
use kvdb::{Database, DatabaseConfig};
use kvdb::{DBTransaction, Database, DatabaseConfig};
use byteorder::{LittleEndian, ByteOrder};
use primitives::hash::H256;
use primitives::bytes::Bytes;
use super::BlockRef;
use serialization;
use chain::{self, RepresentH256};
use parking_lot::RwLock;
use transaction_meta::TransactionMeta;
const COL_COUNT: u32 = 10;
const COL_META: u32 = 0;
@ -16,7 +18,7 @@ const COL_BLOCK_HASHES: u32 = 1;
const COL_BLOCK_HEADERS: u32 = 2;
const COL_BLOCK_TRANSACTIONS: u32 = 3;
const COL_TRANSACTIONS: u32 = 4;
const _COL_RESERVED1: u32 = 5;
const COL_TRANSACTIONS_META: u32 = 5;
const _COL_RESERVED2: u32 = 6;
const _COL_RESERVED3: u32 = 7;
const _COL_RESERVED4: u32 = 8;
@ -28,10 +30,13 @@ const DB_VERSION: u32 = 1;
/// Blockchain storage interface
pub trait Store : Send + Sync {
/// get best block number
fn best_block_number(&self) -> Option<u64>;
fn best_block_number(&self) -> Option<u32>;
/// get best block hash
fn best_block_hash(&self) -> Option<H256>;
/// resolves hash by block number
fn block_hash(&self, number: u64) -> Option<H256>;
fn block_hash(&self, number: u32) -> Option<H256>;
/// resolves header bytes by block reference (number/hash)
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes>;
@ -58,11 +63,21 @@ pub trait Store : Send + Sync {
/// insert block in the storage
fn insert_block(&self, block: &chain::Block) -> Result<(), Error>;
/// get transaction metadata
fn transaction_meta(&self, hash: &H256) -> Option<TransactionMeta>;
}
#[derive(Debug, Clone)]
pub struct BestBlock {
pub number: u32,
pub hash: H256,
}
/// Blockchain storage with rocksdb database
pub struct Storage {
database: Database,
best_block: RwLock<Option<BestBlock>>,
}
#[derive(Debug)]
@ -93,13 +108,15 @@ impl From<std::io::Error> for Error {
}
}
fn u64_key(num: u64) -> [u8; 8] {
let mut result = [0u8; 8];
LittleEndian::write_u64(&mut result, num);
fn u32_key(num: u32) -> [u8; 4] {
let mut result = [0u8; 4];
LittleEndian::write_u32(&mut result, num);
result
}
const KEY_VERSION: &'static[u8] = b"version";
const KEY_BEST_BLOCK_NUMBER: &'static[u8] = b"best_block_number";
const KEY_BEST_BLOCK_HASH: &'static[u8] = b"best_block_hash";
impl Storage {
@ -110,25 +127,45 @@ impl Storage {
let cfg = DatabaseConfig::with_columns(Some(COL_COUNT));
let db = try!(Database::open(&cfg, &*path.as_ref().to_string_lossy()));
match try!(db.get(Some(COL_META), KEY_VERSION)) {
Some(val) => {
let ver = LittleEndian::read_u32(&val);
if ver == DB_VERSION {
Ok(Storage { database: db, })
}
else {
Err(Error::Meta(MetaError::UnsupportedVersion))
let storage = Storage {
database: db,
best_block: RwLock::default(),
};
match storage.read_meta_u32(KEY_VERSION) {
Some(ver) => {
if ver != DB_VERSION {
return Err(Error::Meta(MetaError::UnsupportedVersion))
}
},
_ => {
let mut meta_transaction = db.transaction();
let mut ver_val = [0u8; 4];
LittleEndian::write_u32(&mut ver_val, DB_VERSION);
meta_transaction.put(Some(COL_META), KEY_VERSION, &ver_val);
try!(db.write(meta_transaction));
Ok(Storage { database: db, })
}
let mut meta_transaction = storage.database.transaction();
meta_transaction.write_u32(Some(COL_META), KEY_VERSION, DB_VERSION);
try!(storage.database.write(meta_transaction));
},
};
let best_number = storage.read_meta_u32(KEY_BEST_BLOCK_NUMBER);
let best_hash = storage.get(COL_META, KEY_BEST_BLOCK_HASH).map(|val| H256::from(&**val));
if best_number.is_some() && best_hash.is_some() {
*storage.best_block.write() = Some(
BestBlock {
number: best_number.expect("is_some() is checked above for block number"),
hash: best_hash.expect("is_some() is checked above for block hash"),
}
);
}
Ok(storage)
}
fn read_meta(&self, key: &[u8]) -> Option<Bytes> {
self.get(COL_META, key)
}
fn read_meta_u32(&self, key: &[u8]) -> Option<u32> {
self.read_meta(key).map(|val| LittleEndian::read_u32(&val))
}
/// is invoked on database non-fatal query errors
@ -183,15 +220,32 @@ impl Storage {
})
.collect()
}
/// update transactions metadata in the specified database transaction
fn update_transactions_meta(&self, db_transaction: &mut DBTransaction, accepted_txs: &[chain::Transaction]) {
for accepted_tx in accepted_txs.iter() {
for (input_idx, input) in accepted_tx.inputs.iter().enumerate() {
// todo : hard error when no meta?
let mut meta = self.transaction_meta(&input.previous_output.hash)
.unwrap_or(TransactionMeta::new(0, input_idx+1));
meta.note_used(input_idx);
db_transaction.put(Some(COL_TRANSACTIONS_META), &*input.previous_output.hash, &meta.to_bytes());
}
}
}
}
impl Store for Storage {
fn best_block_number(&self) -> Option<u64> {
unimplemented!()
fn best_block_number(&self) -> Option<u32> {
self.best_block.read().as_ref().map(|bb| bb.number)
}
fn block_hash(&self, number: u64) -> Option<H256> {
self.get(COL_BLOCK_HASHES, &u64_key(number))
fn best_block_hash(&self) -> Option<H256> {
self.best_block.read().as_ref().map(|h| h.hash.clone())
}
fn block_hash(&self, number: u32) -> Option<H256> {
self.get(COL_BLOCK_HASHES, &u32_key(number))
.map(|val| H256::from(&**val))
}
@ -233,10 +287,26 @@ impl Store for Storage {
}
fn insert_block(&self, block: &chain::Block) -> Result<(), Error> {
let mut best_block = self.best_block.write();
let block_hash = block.hash();
let new_best_hash = match best_block.as_ref().map(|bb| &bb.hash) {
Some(best_hash) if &block.header().previous_header_hash != best_hash => best_hash.clone(),
_ => block_hash.clone(),
};
let new_best_number = match best_block.as_ref().map(|b| b.number) {
Some(best_number) => {
if block.hash() == new_best_hash { best_number + 1 }
else { best_number }
},
None => 1,
};
let mut transaction = self.database.transaction();
let tx_space = block.transactions().len() * 32;
let block_hash = block.hash();
let mut tx_refs = Vec::with_capacity(tx_space);
for tx in block.transactions() {
let tx_hash = tx.hash();
@ -255,8 +325,17 @@ impl Store for Storage {
&serialization::serialize(block.header())
);
if best_block.as_ref().map(|b| b.number) != Some(new_best_number) {
self.update_transactions_meta(&mut transaction, &block.transactions()[1..]);
transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number);
}
transaction.put(Some(COL_META), KEY_BEST_BLOCK_HASH, std::ops::Deref::deref(&new_best_hash));
try!(self.database.write(transaction));
*best_block = Some(BestBlock { hash: new_best_hash, number: new_best_number });
Ok(())
}
@ -268,6 +347,11 @@ impl Store for Storage {
})
}
fn transaction_meta(&self, hash: &H256) -> Option<TransactionMeta> {
self.get(COL_TRANSACTIONS_META, &**hash).map(|val|
TransactionMeta::from_bytes(&val).unwrap_or_else(|e| panic!("Invalid transaction metadata: db corrupted? ({:?})", e))
)
}
}
#[cfg(test)]
@ -297,6 +381,34 @@ mod tests {
assert_eq!(loaded_block.hash(), block.hash());
}
#[test]
fn best_block_update() {
let path = RandomTempPath::create_dir();
let store = Storage::new(path.as_path()).unwrap();
let block: Block = test_data::block1();
store.insert_block(&block).unwrap();
assert_eq!(store.best_block_number(), Some(1));
}
#[test]
fn best_hash_update_fork() {
let path = RandomTempPath::create_dir();
let store = Storage::new(path.as_path()).unwrap();
let block: Block = test_data::block1();
store.insert_block(&block).unwrap();
let another_block: Block = test_data::block_h169();
store.insert_block(&another_block).unwrap();
// did not update because `another_block` is not child of `block`
assert_eq!(store.best_block_hash(), Some(block.hash()));
// number should not be update also
assert_eq!(store.best_block_number(), Some(1));
}
#[test]
fn load_transaction() {
let path = RandomTempPath::create_dir();
@ -308,7 +420,6 @@ mod tests {
let loaded_transaction = store.transaction(&tx1).unwrap();
assert_eq!(loaded_transaction.hash(), block.transactions()[0].hash());
}
}

View File

@ -9,6 +9,7 @@ use std::mem::replace;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use parking_lot::RwLock;
use transaction_meta::TransactionMeta;
#[derive(Default)]
pub struct TestStorage {
@ -54,11 +55,15 @@ impl TestStorage {
}
impl Store for TestStorage {
fn best_block_number(&self) -> Option<u64> {
self.data.read().best_block_number.map(|b| b as u64)
fn best_block_number(&self) -> Option<u32> {
self.data.read().best_block_number.map(|b| b as u32)
}
fn block_hash(&self, number: u64) -> Option<H256> {
fn best_block_hash(&self) -> Option<H256> {
unimplemented!()
}
fn block_hash(&self, number: u32) -> Option<H256> {
let data = self.data.read();
data.heights.get(&(number as usize)).cloned()
}
@ -127,4 +132,8 @@ impl Store for TestStorage {
Ok(())
}
fn transaction_meta(&self, _hash: &H256) -> Option<TransactionMeta> {
unimplemented!();
}
}

View File

@ -0,0 +1,47 @@
//! Transaction index
use bit_vec::BitVec;
use byteorder::{LittleEndian, ByteOrder};
/// structure for indexing transaction info
pub struct TransactionMeta {
block_height: u32,
spent: BitVec,
}
#[derive(Debug)]
pub enum Error {
KeyTooShort(usize),
}
impl TransactionMeta {
/// new transaction description for indexing
pub fn new(block_height: u32, outputs: usize) -> Self {
TransactionMeta {
block_height: block_height,
spent: BitVec::from_elem(outputs, false),
}
}
/// note that particular output has been used
pub fn note_used(&mut self, index: usize) {
self.spent.set(index, true);
}
pub fn to_bytes(self) -> Vec<u8> {
let mut result = vec![0u8; 4];
LittleEndian::write_u32(&mut result[0..4], self.block_height);
result.extend(self.spent.to_bytes());
result
}
pub fn from_bytes(bytes: &[u8]) -> Result<TransactionMeta, Error> {
if bytes.len() <= 4 { return Err(Error::KeyTooShort(bytes.len())); }
Ok(TransactionMeta {
block_height: LittleEndian::read_u32(&bytes[0..4]),
spent: BitVec::from_bytes(&bytes[5..]),
})
}
}

View File

@ -142,7 +142,7 @@ impl Chain {
scheduled: self.hash_chain.len_of(SCHEDULED_QUEUE) as u64,
requested: self.hash_chain.len_of(REQUESTED_QUEUE) as u64,
verifying: self.hash_chain.len_of(VERIFYING_QUEUE) as u64,
stored: self.storage.best_block_number().map_or(0, |number| number + 1),
stored: self.storage.best_block_number().map_or(0, |number| number as u64 + 1),
}
}
@ -161,11 +161,11 @@ impl Chain {
let storage_best_block_number = self.storage.best_block_number().expect("storage with genesis block is required");
match self.hash_chain.back() {
Some(hash) => BestBlock {
height: storage_best_block_number + self.hash_chain.len() as u64,
height: storage_best_block_number as u64 + self.hash_chain.len() as u64,
hash: hash.clone(),
},
None => BestBlock {
height: storage_best_block_number,
height: storage_best_block_number as u64,
hash: self.storage.block_hash(storage_best_block_number).expect("storage with genesis block is required"),
}
}
@ -209,8 +209,8 @@ impl Chain {
// calculate for storage
let storage_best_block_number = self.storage.best_block_number().expect("storage with genesis block is required");
let storage_index = if storage_best_block_number < local_index { 0 } else { storage_best_block_number - local_index };
self.block_locator_hashes_for_storage(storage_index, step, &mut block_locator_hashes);
let storage_index = if (storage_best_block_number as u64) < local_index { 0 } else { (storage_best_block_number as u64) - local_index };
self.block_locator_hashes_for_storage(storage_index as u64, step, &mut block_locator_hashes);
block_locator_hashes
}
@ -255,7 +255,7 @@ impl Chain {
trace!(target: "sync", "Cannot push block {:?} to verification queue: {:?}", hash, err);
unimplemented!();
},
_ => (),
_ => (),
}
self.verification_queue.process();
match self.verification_queue.pop_valid() {
@ -309,7 +309,7 @@ impl Chain {
/// Calculate block locator hashes for storage
fn block_locator_hashes_for_storage(&self, mut index: u64, mut step: u64, hashes: &mut Vec<H256>) {
loop {
let block_hash = self.storage.block_hash(index)
let block_hash = self.storage.block_hash(index as u32)
.expect("private function; index calculated in `block_locator_hashes`; qed");
hashes.push(block_hash);