Merge pull request #243 from ethcore/sync_headers_verification

Blocks headers verification before requesting
This commit is contained in:
Nikolay Volf 2016-12-01 17:20:07 +03:00 committed by GitHub
commit e5027ea844
10 changed files with 224 additions and 69 deletions

View File

@ -3,19 +3,21 @@ use primitives::hash::H256;
use primitives::bytes::Bytes;
use chain;
pub trait BlockProvider {
/// resolves number by block hash
fn block_number(&self, hash: &H256) -> Option<u32>;
/// resolves hash by block number
fn block_hash(&self, number: u32) -> Option<H256>;
pub trait BlockHeaderProvider {
/// resolves header bytes by block reference (number/hash)
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes>;
/// resolves header bytes by block reference (number/hash)
fn block_header(&self, block_ref: BlockRef) -> Option<chain::BlockHeader>;
}
pub trait BlockProvider: BlockHeaderProvider {
/// resolves number by block hash
fn block_number(&self, hash: &H256) -> Option<u32>;
/// resolves hash by block number
fn block_hash(&self, number: u32) -> Option<H256>;
/// resolves deserialized block body by block reference (number/hash)
fn block(&self, block_ref: BlockRef) -> Option<chain::Block>;
@ -30,5 +32,9 @@ pub trait BlockProvider {
/// returns all transactions in the block by block reference (number/hash)
fn block_transactions(&self, block_ref: BlockRef) -> Vec<chain::Transaction>;
}
pub trait AsBlockHeaderProvider {
/// returns `BlockHeaderProvider`
fn as_block_header_provider(&self) -> &BlockHeaderProvider;
}

View File

@ -69,7 +69,7 @@ pub use kvdb::Database;
pub use transaction_provider::{TransactionProvider, AsTransactionProvider, PreviousTransactionOutputProvider};
pub use transaction_meta_provider::TransactionMetaProvider;
pub use block_stapler::{BlockStapler, BlockInsertedChain};
pub use block_provider::BlockProvider;
pub use block_provider::{BlockProvider, BlockHeaderProvider, AsBlockHeaderProvider};
pub use indexed_block::{IndexedBlock, IndexedTransactions};
#[cfg(feature="dev")]

View File

@ -14,7 +14,7 @@ use transaction_meta::TransactionMeta;
use error::{Error, ConsistencyError, MetaError};
use update_context::UpdateContext;
use block_provider::BlockProvider;
use block_provider::{BlockProvider, BlockHeaderProvider, AsBlockHeaderProvider};
use transaction_provider::TransactionProvider;
use transaction_meta_provider::TransactionMetaProvider;
use block_stapler::{BlockStapler, BlockInsertedChain, Reorganization};
@ -38,7 +38,7 @@ const DB_VERSION: u32 = 1;
const MAX_FORK_ROUTE_PRESET: usize = 2048;
/// Blockchain storage interface
pub trait Store : BlockProvider + BlockStapler + TransactionProvider + TransactionMetaProvider {
pub trait Store : BlockProvider + BlockStapler + TransactionProvider + TransactionMetaProvider + AsBlockHeaderProvider {
/// get best block
fn best_block(&self) -> Option<BestBlock>;
@ -400,6 +400,24 @@ impl Storage {
}
}
impl BlockHeaderProvider for Storage {
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes> {
self.resolve_hash(block_ref).and_then(|h| self.get(COL_BLOCK_HEADERS, &*h))
}
fn block_header(&self, block_ref: BlockRef) -> Option<chain::BlockHeader> {
self.block_header_bytes(block_ref).map(
|bytes| deserialize::<_, chain::BlockHeader>(bytes.as_ref())
.expect("Error deserializing header, possible db corruption"))
}
}
impl AsBlockHeaderProvider for Storage {
fn as_block_header_provider(&self) -> &BlockHeaderProvider {
&*self
}
}
impl BlockProvider for Storage {
fn block_number(&self, hash: &H256) -> Option<u32> {
self.get(COL_BLOCK_NUMBERS, &**hash)
@ -411,16 +429,6 @@ impl BlockProvider for Storage {
.map(|val| H256::from(&**val))
}
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes> {
self.resolve_hash(block_ref).and_then(|h| self.get(COL_BLOCK_HEADERS, &*h))
}
fn block_header(&self, block_ref: BlockRef) -> Option<chain::BlockHeader> {
self.block_header_bytes(block_ref).map(
|bytes| deserialize::<_, chain::BlockHeader>(bytes.as_ref())
.expect("Error deserializing header, possible db corruption"))
}
fn block_transaction_hashes(&self, block_ref: BlockRef) -> Vec<H256> {
self.resolve_hash(block_ref)
.map(|h| self.block_transaction_hashes_by_hash(&h))

View File

@ -3,7 +3,7 @@
use super::{
BlockRef, Store, Error, BestBlock, BlockLocation, BlockInsertedChain, BlockProvider,
BlockStapler, TransactionMetaProvider, TransactionProvider, AsTransactionProvider,
IndexedBlock,
IndexedBlock, BlockHeaderProvider, AsBlockHeaderProvider,
};
use chain::{self, Block};
use primitives::hash::H256;
@ -68,18 +68,7 @@ impl TestStorage {
}
}
impl BlockProvider for TestStorage {
fn block_number(&self, hash: &H256) -> Option<u32> {
let data = self.data.read();
data.hashes.get(hash).cloned()
}
fn block_hash(&self, number: u32) -> Option<H256> {
let data = self.data.read();
data.heights.get(&number).cloned()
}
impl BlockHeaderProvider for TestStorage {
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes> {
let data = self.data.read();
self.resolve_hash(block_ref)
@ -93,6 +82,24 @@ impl BlockProvider for TestStorage {
.and_then(|ref h| data.blocks.get(h))
.map(|ref block| block.header().clone())
}
}
impl AsBlockHeaderProvider for TestStorage {
fn as_block_header_provider(&self) -> &BlockHeaderProvider {
&*self
}
}
impl BlockProvider for TestStorage {
fn block_number(&self, hash: &H256) -> Option<u32> {
let data = self.data.read();
data.hashes.get(hash).cloned()
}
fn block_hash(&self, number: u32) -> Option<H256> {
let data = self.data.read();
data.heights.get(&number).cloned()
}
fn block_transaction_hashes(&self, block_ref: BlockRef) -> Vec<H256> {
let data = self.data.read();

View File

@ -77,7 +77,7 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network);
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));

View File

@ -265,6 +265,7 @@ mod tests {
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
use network::Magic;
use db;
use super::LocalNode;
use test_data;
@ -313,7 +314,7 @@ mod tests {
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { threads_num: 1 };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Mainnet);
let mut verifier = DummyVerifier::default();
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);

View File

@ -212,6 +212,20 @@ impl Chain {
self.best_storage_block.clone()
}
/// Get best block header
pub fn best_block_header(&self) -> db::BestBlock {
let headers_chain_information = self.headers_chain.information();
if headers_chain_information.best == 0 {
return self.best_storage_block()
}
db::BestBlock {
number: self.best_storage_block.number + headers_chain_information.best,
hash: self.headers_chain.at(headers_chain_information.best - 1)
.expect("got this index above; qed")
.hash(),
}
}
/// Get block header by hash
pub fn block_hash(&self, number: u32) -> Option<H256> {
if number <= self.best_storage_block.number {

View File

@ -7,14 +7,15 @@ use futures::{BoxFuture, Future, finished};
use futures::stream::Stream;
use tokio_core::reactor::{Handle, Interval};
use futures_cpupool::CpuPool;
use db::{self, IndexedBlock};
use db::{self, IndexedBlock, BlockHeaderProvider, BlockRef};
use chain::{BlockHeader, Transaction};
use message::types;
use message::common::{InventoryVector, InventoryType};
use primitives::hash::H256;
use primitives::bytes::Bytes;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
use synchronization_chain::{ChainRef, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
use synchronization_chain::{ChainRef, Chain, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use synchronization_executor::{Task, TaskExecutor};
@ -28,6 +29,8 @@ use synchronization_verifier::{Verifier, VerificationSink, VerificationTask};
use compact_block_builder::build_compact_block;
use hash_queue::HashPosition;
use miner::transaction_fee_rate;
use verification::ChainVerifier;
use network::Magic;
use time;
use std::time::Duration;
@ -285,6 +288,10 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
orphaned_blocks_pool: OrphanBlocksPool,
/// Orphaned transactions pool.
orphaned_transactions_pool: OrphanTransactionsPool,
/// Network config
network: Magic,
/// Verify block headers?
verify_headers: bool,
/// Verifying blocks by peer
verifying_blocks_by_peer: HashMap<H256, usize>,
/// Verifying blocks futures
@ -293,6 +300,18 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
do_not_relay: HashSet<H256>,
}
/// Block headers provider from `headers` message
pub struct MessageBlockHeadersProvider<'a> {
/// sync chain
chain: &'a Chain,
/// headers offset
first_header_number: u32,
/// headers by hash
headers: HashMap<H256, BlockHeader>,
/// headers by order
headers_order: Vec<H256>,
}
impl Config {
pub fn new() -> Self {
Config {
@ -609,17 +628,29 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
return;
}
// TODO: add full blocks headers validation here
// validate blocks headers before scheduling
let chain = self.chain.read();
let verifier = ChainVerifier::new(chain.storage(), self.network);
let mut block_header_provider = MessageBlockHeadersProvider::new(&*chain);
let mut blocks_hashes: Vec<H256> = Vec::with_capacity(blocks_headers.len());
let mut prev_block_hash = header0.previous_header_hash.clone();
for block_header in &blocks_headers {
let block_header_hash = block_header.hash();
// check that this header is direct child of previous header
if block_header.previous_header_hash != prev_block_hash {
warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash);
return;
}
// verify header
if self.verify_headers {
if let Err(error) = verifier.verify_block_header(&block_header_provider, &block_header_hash, &block_header) {
warn!(target: "sync", "Error verifying header {:?} from peer#{} `headers` message: {:?}", block_header_hash.to_reversed_str(), peer_index, error);
return;
}
}
block_header_provider.append_header(block_header_hash.clone(), block_header.clone());
blocks_hashes.push(block_header_hash.clone());
prev_block_hash = block_header_hash;
}
@ -948,7 +979,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
/// Create new synchronization client core
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef, network: Magic) -> Arc<Mutex<Self>> {
let sync = Arc::new(Mutex::new(
SynchronizationClientCore {
state: State::Saturated,
@ -959,6 +990,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
chain: chain.clone(),
orphaned_blocks_pool: OrphanBlocksPool::new(),
orphaned_transactions_pool: OrphanTransactionsPool::new(),
network: network,
verify_headers: true,
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_futures: HashMap::new(),
do_not_relay: HashSet::new(),
@ -1016,6 +1049,12 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Verify block headers or not?
#[cfg(test)]
pub fn verify_headers(&mut self, verify: bool) {
self.verify_headers = verify;
}
/// Relay new blocks
fn relay_new_blocks(&mut self, new_blocks_hashes: Vec<H256>) {
let tasks: Vec<_> = {
@ -1397,6 +1436,44 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
impl<'a> MessageBlockHeadersProvider<'a> {
pub fn new(chain: &'a Chain) -> Self {
let first_header_number = chain.best_block_header().number + 1;
MessageBlockHeadersProvider {
chain: chain,
first_header_number: first_header_number,
headers: HashMap::new(),
headers_order: Vec::new(),
}
}
pub fn append_header(&mut self, hash: H256, header: BlockHeader) {
self.headers.insert(hash.clone(), header);
self.headers_order.push(hash);
}
}
impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> {
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes> {
use ser::serialize;
self.block_header(block_ref).map(|h| serialize(&h))
}
fn block_header(&self, block_ref: BlockRef) -> Option<BlockHeader> {
match block_ref {
BlockRef::Hash(h) => self.chain.block_header_by_hash(&h)
.or_else(|| self.headers.get(&h).cloned()),
BlockRef::Number(n) => self.chain.block_header_by_number(n)
.or_else(|| if n >= self.first_header_number && n - self.first_header_number < self.headers_order.len() as u32 {
let ref header_hash = self.headers_order[(n - self.first_header_number) as usize];
Some(self.headers[header_hash].clone())
} else {
None
}),
}
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
@ -1405,7 +1482,7 @@ pub mod tests {
use chain::{Block, Transaction};
use message::common::{InventoryVector, InventoryType};
use message::types;
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType};
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType, MessageBlockHeadersProvider};
use connection_filter::tests::*;
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
@ -1413,9 +1490,10 @@ pub mod tests {
use synchronization_verifier::tests::DummyVerifier;
use synchronization_server::ServerTaskIndex;
use primitives::hash::H256;
use network::Magic;
use p2p::event_loop;
use test_data;
use db;
use db::{self, BlockHeaderProvider};
use devtools::RandomTempPath;
fn create_disk_storage() -> db::SharedStore {
@ -1434,7 +1512,10 @@ pub mod tests {
let executor = DummyTaskExecutor::new();
let config = Config { threads_num: 1 };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Testnet);
{
client_core.lock().verify_headers(false);
}
let mut verifier = verifier.unwrap_or_default();
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);
@ -2364,4 +2445,26 @@ pub mod tests {
}
assert_eq!(tasks[2], Task::SendInventory(3, inventory));
}
#[test]
fn test_message_block_headers_provider() {
let storage = Arc::new(db::TestStorage::with_genesis_block());
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
let chain = chain.read();
let mut headers_provider = MessageBlockHeadersProvider::new(&*chain);
assert_eq!(headers_provider.block_header(db::BlockRef::Hash(test_data::genesis().hash())), Some(test_data::genesis().block_header));
assert_eq!(headers_provider.block_header(db::BlockRef::Number(0)), Some(test_data::genesis().block_header));
assert_eq!(headers_provider.block_header(db::BlockRef::Hash(H256::from(1))), None);
assert_eq!(headers_provider.block_header(db::BlockRef::Number(1)), None);
headers_provider.append_header(test_data::block_h1().hash(), test_data::block_h1().block_header);
assert_eq!(headers_provider.block_header(db::BlockRef::Hash(test_data::genesis().hash())), Some(test_data::genesis().block_header));
assert_eq!(headers_provider.block_header(db::BlockRef::Number(0)), Some(test_data::genesis().block_header));
assert_eq!(headers_provider.block_header(db::BlockRef::Hash(test_data::block_h1().hash())), Some(test_data::block_h1().block_header));
assert_eq!(headers_provider.block_header(db::BlockRef::Number(1)), Some(test_data::block_h1().block_header));
assert_eq!(headers_provider.block_header(db::BlockRef::Hash(H256::from(1))), None);
assert_eq!(headers_provider.block_header(db::BlockRef::Number(2)), None);
}
}

View File

@ -1,12 +1,13 @@
//! Bitcoin chain verifier
use std::collections::BTreeSet;
use db::{self, BlockLocation, PreviousTransactionOutputProvider};
use db::{self, BlockLocation, PreviousTransactionOutputProvider, BlockHeaderProvider};
use network::{Magic, ConsensusParams};
use script::Script;
use super::{Verify, VerificationResult, Chain, Error, TransactionError};
use {chain, utils};
use scoped_pool::Pool;
use primitives::hash::H256;
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
const COINBASE_MATURITY: u32 = 100; // 2 hours
@ -238,6 +239,36 @@ impl ChainVerifier {
Ok(())
}
pub fn verify_block_header(
&self,
block_header_provider: &BlockHeaderProvider,
hash: &H256,
header: &chain::BlockHeader
) -> Result<(), Error> {
// target difficulty threshold
if !self.skip_pow && !utils::check_nbits(self.network.max_nbits(), hash, header.nbits) {
return Err(Error::Pow);
}
// check if block timestamp is not far in the future
if utils::age(header.time) < -BLOCK_MAX_FUTURE {
return Err(Error::Timestamp);
}
if let Some(median_timestamp) = self.median_timestamp(block_header_provider, header) {
if median_timestamp >= header.time {
trace!(
target: "verification", "median timestamp verification failed, median: {}, current: {}",
median_timestamp,
header.time
);
return Err(Error::Timestamp);
}
}
Ok(())
}
fn verify_block(&self, block: &db::IndexedBlock) -> VerificationResult {
use task::Task;
@ -248,26 +279,8 @@ impl ChainVerifier {
return Err(Error::Empty);
}
// target difficulty threshold
if !self.skip_pow && !utils::check_nbits(self.network.max_nbits(), &hash, block.header().nbits) {
return Err(Error::Pow);
}
// check if block timestamp is not far in the future
if utils::age(block.header().time) < -BLOCK_MAX_FUTURE {
return Err(Error::Timestamp);
}
if let Some(median_timestamp) = self.median_timestamp(block) {
if median_timestamp >= block.header().time {
trace!(
target: "verification", "median timestamp verification failed, median: {}, current: {}",
median_timestamp,
block.header().time
);
return Err(Error::Timestamp);
}
}
// block header checks
try!(self.verify_block_header(self.store.as_block_header_provider(), &hash, block.header()));
// todo: serialized_size function is at least suboptimal
let size = block.size();
@ -342,12 +355,12 @@ impl ChainVerifier {
}
}
fn median_timestamp(&self, block: &db::IndexedBlock) -> Option<u32> {
fn median_timestamp(&self, block_header_provider: &BlockHeaderProvider, header: &chain::BlockHeader) -> Option<u32> {
let mut timestamps = BTreeSet::new();
let mut block_ref = block.header().previous_header_hash.clone().into();
let mut block_ref = header.previous_header_hash.clone().into();
// TODO: optimize it, so it does not make 11 redundant queries each time
for _ in 0..11 {
let previous_header = match self.store.block_header(block_ref) {
let previous_header = match block_header_provider.block_header(block_ref) {
Some(h) => h,
None => { break; }
};

View File

@ -62,7 +62,10 @@ pub fn check_nbits(max_nbits: u32, hash: &H256, n_bits: u32) -> bool {
let mut nb = [0u8; 4];
BigEndian::write_u32(&mut nb, n_bits);
let shift = (nb[0] - 3) as usize; // total shift for mantissa
let shift = match nb[0].checked_sub(3) {
Some(v) => v,
None => return false,
} as usize; // total shift for mantissa
if shift >= 30 { return false; } // invalid shift