connected sync to verify_block_header

This commit is contained in:
Svyatoslav Nikolsky 2016-12-01 14:55:21 +03:00
parent 11e1f8d5c8
commit fac3864f45
5 changed files with 107 additions and 8 deletions

View File

@ -77,7 +77,7 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network);
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));

View File

@ -265,6 +265,7 @@ mod tests {
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
use network::Magic;
use db;
use super::LocalNode;
use test_data;
@ -313,7 +314,7 @@ mod tests {
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { threads_num: 1 };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Mainnet);
let mut verifier = DummyVerifier::default();
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);

View File

@ -212,6 +212,20 @@ impl Chain {
self.best_storage_block.clone()
}
/// Get best block header
pub fn best_block_header(&self) -> db::BestBlock {
let headers_chain_information = self.headers_chain.information();
if headers_chain_information.best == 0 {
return self.best_storage_block()
}
db::BestBlock {
number: self.best_storage_block.number + headers_chain_information.best,
hash: self.headers_chain.at(headers_chain_information.best - 1)
.expect("got this index above; qed")
.hash(),
}
}
/// Get block header by hash
pub fn block_hash(&self, number: u32) -> Option<H256> {
if number <= self.best_storage_block.number {

View File

@ -7,14 +7,15 @@ use futures::{BoxFuture, Future, finished};
use futures::stream::Stream;
use tokio_core::reactor::{Handle, Interval};
use futures_cpupool::CpuPool;
use db::{self, IndexedBlock};
use db::{self, IndexedBlock, BlockHeaderProvider, BlockRef};
use chain::{BlockHeader, Transaction};
use message::types;
use message::common::{InventoryVector, InventoryType};
use primitives::hash::H256;
use primitives::bytes::Bytes;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
use synchronization_chain::{ChainRef, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
use synchronization_chain::{ChainRef, Chain, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use synchronization_executor::{Task, TaskExecutor};
@ -28,6 +29,8 @@ use synchronization_verifier::{Verifier, VerificationSink, VerificationTask};
use compact_block_builder::build_compact_block;
use hash_queue::HashPosition;
use miner::transaction_fee_rate;
use verification::ChainVerifier;
use network::Magic;
use time;
use std::time::Duration;
@ -285,6 +288,10 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
orphaned_blocks_pool: OrphanBlocksPool,
/// Orphaned transactions pool.
orphaned_transactions_pool: OrphanTransactionsPool,
/// Network config
network: Magic,
/// Verify block headers?
verify_headers: bool,
/// Verifying blocks by peer
verifying_blocks_by_peer: HashMap<H256, usize>,
/// Verifying blocks futures
@ -293,6 +300,18 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
do_not_relay: HashSet<H256>,
}
/// Block headers provider from `headers` message
struct MessageBlockHeadersProvider<'a> {
/// sync chain
chain: &'a Chain,
/// headers offset
first_header_number: u32,
/// headers by hash
headers: HashMap<H256, BlockHeader>,
/// headers by order
headers_order: Vec<H256>,
}
impl Config {
pub fn new() -> Self {
Config {
@ -609,17 +628,29 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
return;
}
// TODO: add full blocks headers validation here
// validate blocks headers before scheduling
let chain = self.chain.read();
let verifier = ChainVerifier::new(chain.storage(), self.network);
let mut block_header_provider = MessageBlockHeadersProvider::new(&*chain);
let mut blocks_hashes: Vec<H256> = Vec::with_capacity(blocks_headers.len());
let mut prev_block_hash = header0.previous_header_hash.clone();
for block_header in &blocks_headers {
let block_header_hash = block_header.hash();
// check that this header is direct child of previous header
if block_header.previous_header_hash != prev_block_hash {
warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash);
return;
}
// verify header
if self.verify_headers {
if let Err(error) = verifier.verify_block_header(&block_header_provider, &block_header_hash, &block_header) {
warn!(target: "sync", "Error verifying header {:?} from peer#{} `headers` message: {:?}", block_header_hash.to_reversed_str(), peer_index, error);
return;
}
}
block_header_provider.append_header(block_header_hash.clone(), block_header.clone());
blocks_hashes.push(block_header_hash.clone());
prev_block_hash = block_header_hash;
}
@ -948,7 +979,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
/// Create new synchronization client core
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef, network: Magic) -> Arc<Mutex<Self>> {
let sync = Arc::new(Mutex::new(
SynchronizationClientCore {
state: State::Saturated,
@ -959,6 +990,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
chain: chain.clone(),
orphaned_blocks_pool: OrphanBlocksPool::new(),
orphaned_transactions_pool: OrphanTransactionsPool::new(),
network: network,
verify_headers: true,
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_futures: HashMap::new(),
do_not_relay: HashSet::new(),
@ -1016,6 +1049,12 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Verify block headers or not?
#[cfg(test)]
pub fn verify_headers(&mut self, verify: bool) {
self.verify_headers = verify;
}
/// Relay new blocks
fn relay_new_blocks(&mut self, new_blocks_hashes: Vec<H256>) {
let tasks: Vec<_> = {
@ -1397,6 +1436,44 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
impl<'a> MessageBlockHeadersProvider<'a> {
pub fn new(chain: &'a Chain) -> Self {
let first_header_number = chain.best_block_header().number + 1;
MessageBlockHeadersProvider {
chain: chain,
first_header_number: first_header_number,
headers: HashMap::new(),
headers_order: Vec::new(),
}
}
pub fn append_header(&mut self, hash: H256, header: BlockHeader) {
self.headers.insert(hash.clone(), header);
self.headers_order.push(hash);
}
}
impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> {
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes> {
use ser::serialize;
self.block_header(block_ref).map(|h| serialize(&h))
}
fn block_header(&self, block_ref: BlockRef) -> Option<BlockHeader> {
match block_ref {
BlockRef::Hash(h) => self.chain.block_header_by_hash(&h)
.or_else(|| self.headers.get(&h).cloned()),
BlockRef::Number(n) => self.chain.block_header_by_number(n)
.or_else(|| if n >= self.first_header_number && n - self.first_header_number < self.headers_order.len() as u32 {
let ref header_hash = self.headers_order[(n - self.first_header_number) as usize];
Some(self.headers[header_hash].clone())
} else {
None
}),
}
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
@ -1413,6 +1490,7 @@ pub mod tests {
use synchronization_verifier::tests::DummyVerifier;
use synchronization_server::ServerTaskIndex;
use primitives::hash::H256;
use network::Magic;
use p2p::event_loop;
use test_data;
use db;
@ -1434,7 +1512,10 @@ pub mod tests {
let executor = DummyTaskExecutor::new();
let config = Config { threads_num: 1 };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Testnet);
{
client_core.lock().verify_headers(false);
}
let mut verifier = verifier.unwrap_or_default();
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);

View File

@ -62,7 +62,10 @@ pub fn check_nbits(max_nbits: u32, hash: &H256, n_bits: u32) -> bool {
let mut nb = [0u8; 4];
BigEndian::write_u32(&mut nb, n_bits);
let shift = (nb[0] - 3) as usize; // total shift for mantissa
let shift = match nb[0].checked_sub(3) {
Some(v) => v,
None => return false,
} as usize; // total shift for mantissa
if shift >= 30 { return false; } // invalid shift