This commit is contained in:
NikVolf 2016-10-15 13:06:52 +03:00
parent 8666aae12b
commit add9c72817
3 changed files with 73 additions and 35 deletions

View File

@ -8,7 +8,6 @@ use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
Options, DBCompactionStyle, BlockBasedOptions, Cache, Column};
use elastic_array::ElasticArray32;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::convert::From;
use super::Bytes;

View File

@ -20,3 +20,6 @@ pub enum BlockRef {
Number(u64),
Hash(primitives::hash::H256),
}
pub use storage::{Storage, Store};
pub use kvdb::Database;

View File

@ -1,6 +1,6 @@
//! Bitcoin storage
use kvdb::Database;
use kvdb::{Database, DatabaseConfig};
use primitives::hash::H256;
use super::{BlockRef, Bytes};
use byteorder::{LittleEndian, ByteOrder};
@ -15,12 +15,14 @@ const COL_BLOCK_HASHES: u32 = 1;
const COL_BLOCK_HEADERS: u32 = 2;
const COL_BLOCK_TRANSACTIONS: u32 = 3;
const COL_TRANSACTIONS: u32 = 4;
//const COL_RESERVED1: u32 = 5;
//const COL_RESERVED2: u32 = 6;
//const COL_RESERVED3: u32 = 7;
//const COL_RESERVED4: u32 = 8;
//const COL_RESERVED5: u32 = 9;
//const COL_RESERVED6: u32 = 10;
const _COL_RESERVED1: u32 = 5;
const _COL_RESERVED2: u32 = 6;
const _COL_RESERVED3: u32 = 7;
const _COL_RESERVED4: u32 = 8;
const _COL_RESERVED5: u32 = 9;
const _COL_RESERVED6: u32 = 10;
const DB_VERSION: u32 = 1;
pub trait Store {
fn block_hash(&self, number: u64) -> Option<H256>;
@ -34,16 +36,24 @@ pub trait Store {
fn block(&self, block_ref: BlockRef) -> Option<chain::Block>;
}
struct Storage {
pub struct Storage {
database: Database,
}
#[derive(Debug)]
pub enum MetaError {
NoVersion,
UnsupportedVersion,
}
/// Database error
pub enum Error {
/// Rocksdb error
DB(String),
/// Io error
Io(std::io::Error),
/// Invalid meta info
Meta(MetaError),
}
impl From<String> for Error {
@ -70,10 +80,21 @@ impl Storage {
// if no directory exists, it will be created
pub fn new<P: AsRef<Path>>(path: P) -> Result<Storage, Error> {
try!(fs::create_dir_all(path.as_ref()));
let cfg = DatabaseConfig::with_columns(Some(COL_COUNT));
let db = try!(Database::open(&cfg, &*path.as_ref().to_string_lossy()));
Ok(Storage {
database: try!(Database::open_default(&*path.as_ref().to_string_lossy())),
})
match try!(db.get(Some(COL_META), b"version")) {
Some(val) => {
let ver = LittleEndian::read_u32(&val);
if ver == DB_VERSION {
Ok(Storage { database: db, })
}
else {
Err(Error::Meta(MetaError::UnsupportedVersion))
}
},
_ => Err(Error::Meta(MetaError::NoVersion))
}
}
fn db_error(&self, msg: String) {
@ -97,6 +118,14 @@ impl Storage {
BlockRef::Hash(h) => Some(h),
}
}
fn block_transactions_by_hash(&self, h: &H256) -> Vec<H256> {
self.get(COL_BLOCK_TRANSACTIONS, &**h)
.unwrap_or(Vec::new())
.chunks(H256::size())
.map(H256::from)
.collect()
}
}
impl Store for Storage {
@ -111,11 +140,8 @@ impl Store for Storage {
fn block_transactions(&self, block_ref: BlockRef) -> Vec<H256> {
self.resolve_hash(block_ref)
.and_then(|h| self.get(COL_BLOCK_TRANSACTIONS, &*h))
.map(|h| self.block_transactions_by_hash(&h))
.unwrap_or(Vec::new())
.chunks(H256::size())
.map(H256::from)
.collect()
}
fn transaction_bytes(&self, hash: &H256) -> Option<Bytes> {
@ -123,28 +149,38 @@ impl Store for Storage {
}
fn block(&self, block_ref: BlockRef) -> Option<chain::Block> {
self.resolve_hash(block_ref)
//.and_then(|h| (self.get(COL_BLOCK_HEADERS, &*h), self.block_transactions(BlockRef::Hash(h))))
.and_then(|(header_bytes, transactions)| {
let reader = serialization::Reader::new(&header_bytes[..]);
let header = chain::BlockHeader::deserialize(&reader).ok().and_then(
|header| None
)
self.resolve_hash(block_ref).and_then(|block_hash|
self.get(COL_BLOCK_HEADERS, &*block_hash)
.and_then(|header_bytes| {
let transactions = self.block_transactions_by_hash(&block_hash)
.into_iter()
.filter_map(|tx_hash| {
self.transaction_bytes(&tx_hash).and_then(|tx_bytes| {
let mut reader = serialization::Reader::new(&tx_bytes[..]);
match chain::Transaction::deserialize(&mut reader) {
Ok(tx) => Some(tx),
Err(e) => {
self.db_error(format!("Error deserializing header, possible db corruption ({:?})", e));
None
}
}
})
})
.collect();
let mut reader = serialization::Reader::new(&header_bytes[..]);
let maybe_header = match chain::BlockHeader::deserialize(&mut reader) {
Ok(header) => Some(header),
Err(e) => {
self.db_error(format!("Error deserializing header, possible db corruption ({:?})", e));
None
}
};
maybe_header.map(|header| chain::Block::new(header, transactions))
})
)
}
}
//
// chain::Block::new(
// header,
// transactions.into_iter()
// .filter_map(|t_hash| {
// let maybe_bytes = self.transaction_bytes(t_hash);
// maybe_bytes.and_then(|tx_bytes| {
// let tx_reader = serialization::Reader::new(&tx_bytes[..]);
// chain::Transaction::deserialize(&tx_reader).ok()
// })
// })
// .collect()
#[cfg(test)]
mod tests {