Merge pull request #67 from paritytech/verify_headers_in_separate_thread

Verify headers in separate thread
This commit is contained in:
Svyatoslav Nikolsky 2019-04-03 08:15:49 +03:00 committed by GitHub
commit d93ba52f2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 277 additions and 273 deletions

View File

@ -6,9 +6,9 @@ use storage;
use network::ConsensusParams;
use primitives::hash::H256;
use super::Error;
use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask,
use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask, HeadersVerificationSink,
VerificationSink, BlockVerificationSink, TransactionVerificationSink};
use types::StorageRef;
use types::{PeerIndex, StorageRef};
use utils::OrphanBlocksPool;
use VerificationParameters;
@ -141,6 +141,16 @@ impl TransactionVerificationSink for BlocksWriterSink {
}
}
impl HeadersVerificationSink for BlocksWriterSink {
fn on_headers_verification_success(&self, _headers: Vec<chain::IndexedBlockHeader>) {
unreachable!("not intended to verify headers")
}
fn on_headers_verification_error(&self, _peer: PeerIndex, _err: String, _hash: H256) {
unreachable!("not intended to verify headers")
}
}
#[cfg(test)]
mod tests {
extern crate test_data;

View File

@ -58,7 +58,7 @@ pub enum Error {
Verification(String),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
/// Verification parameters.
pub struct VerificationParameters {
/// Blocks verification level.
@ -111,13 +111,15 @@ pub fn create_local_sync_node(consensus: ConsensusParams, db: storage::SharedSto
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(db.clone()));
let sync_chain = SyncChain::new(db.clone(), memory_pool.clone());
let chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone()));
let light_chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone()));
let heavy_chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone()));
let sync_executor = SyncExecutor::new(peers.clone());
let sync_server = Arc::new(ServerImpl::new(peers.clone(), db.clone(), memory_pool.clone(), sync_executor.clone()));
let sync_client_core = SynchronizationClientCore::new(sync_client_config, sync_state.clone(), peers.clone(), sync_executor.clone(), sync_chain, chain_verifier.clone());
let sync_client_core = SynchronizationClientCore::new(sync_client_config, sync_state.clone(), peers.clone(), sync_executor.clone(), sync_chain);
let verifier_sink = Arc::new(CoreVerificationSink::new(sync_client_core.clone()));
let verifier = AsyncVerifier::new(chain_verifier, db.clone(), memory_pool.clone(), verifier_sink, verification_params);
let sync_client = SynchronizationClient::new(sync_state.clone(), sync_client_core, verifier);
let light_verifier = AsyncVerifier::new(light_chain_verifier, db.clone(), memory_pool.clone(), verifier_sink.clone(), verification_params.clone());
let heavy_verifier = AsyncVerifier::new(heavy_chain_verifier, db.clone(), memory_pool.clone(), verifier_sink, verification_params);
let sync_client = SynchronizationClient::new(sync_state.clone(), sync_client_core, light_verifier, heavy_verifier);
Arc::new(SyncNode::new(consensus, db, memory_pool, peers, sync_state, sync_client, sync_server))
}

View File

@ -282,7 +282,6 @@ pub mod tests {
use synchronization_server::tests::DummyServer;
use synchronization_verifier::tests::DummyVerifier;
use primitives::bytes::Bytes;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
use std::iter::repeat;
use synchronization_peers::PeersImpl;
use utils::SynchronizationState;
@ -312,14 +311,15 @@ pub mod tests {
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { close_connection_on_bad_block: true };
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), ConsensusParams::new(Network::Mainnet)));
let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier);
let mut verifier = match verifier {
let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain);
let mut light_verifier = DummyVerifier::default();
light_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
let mut heavy_verifier = match verifier {
Some(verifier) => verifier,
None => DummyVerifier::default(),
};
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
let client = SynchronizationClient::new(sync_state.clone(), client_core, verifier);
heavy_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
let client = SynchronizationClient::new(sync_state.clone(), client_core, light_verifier, heavy_verifier);
let local_node = LocalNode::new(ConsensusParams::new(Network::Mainnet), storage, memory_pool, sync_peers, sync_state, client, server.clone());
(executor, server, local_node)
}

View File

@ -135,14 +135,16 @@ pub trait Client : Send + Sync + 'static {
/// Synchronization client facade
pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> {
/// Verification mutex
verification_lock: Mutex<()>,
/// Shared client state
shared_state: SynchronizationStateRef,
/// Client core
core: ClientCoreRef<SynchronizationClientCore<T>>,
/// Verifier
verifier: U,
/// Verification mutex
heavy_verification_lock: Mutex<()>,
/// Verifier that performs heavy verifications (blocks during sync + transactions).
heavy_verifier: U,
/// Verifier that performs lightweight verifications (headers during sync).
light_verifier: U,
}
impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {
@ -159,7 +161,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
}
fn on_headers(&self, peer_index: PeerIndex, message: types::Headers) {
self.core.lock().on_headers(peer_index, message);
let headers_to_verify = self.core.lock().on_headers(peer_index, message);
if let Some(headers_to_verify) = headers_to_verify {
self.light_verifier.verify_headers(peer_index, headers_to_verify);
}
}
fn on_block(&self, peer_index: PeerIndex, block: IndexedBlock) {
@ -169,13 +174,13 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
{
// verification tasks must be scheduled in the same order as they were built in on_block
// => here we use verification_lock for this
let _verification_lock = self.verification_lock.lock();
let _verification_lock = self.heavy_verification_lock.lock();
let blocks_to_verify = self.core.lock().on_block(peer_index, block);
// verify blocks
if let Some(mut blocks_to_verify) = blocks_to_verify {
while let Some(block) = blocks_to_verify.pop_front() {
self.verifier.verify_block(block);
self.heavy_verifier.verify_block(block);
}
}
}
@ -201,7 +206,7 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
// => we should verify blocks we mine
let next_block_height = self.shared_state.best_storage_block_height() + 1;
while let Some(tx) = transactions_to_verify.pop_front() {
self.verifier.verify_transaction(next_block_height, tx);
self.heavy_verifier.verify_transaction(next_block_height, tx);
}
}
}
@ -219,7 +224,7 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
let next_block_height = self.shared_state.best_storage_block_height() + 1;
while let Some(tx) = transactions_to_verify.pop_front() {
self.verifier.verify_transaction(next_block_height, tx);
self.heavy_verifier.verify_transaction(next_block_height, tx);
}
Ok(())
}
@ -231,12 +236,18 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
impl<T, U> SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {
/// Create new synchronization client
pub fn new(shared_state: SynchronizationStateRef, core: ClientCoreRef<SynchronizationClientCore<T>>, verifier: U) -> Arc<Self> {
pub fn new(
shared_state: SynchronizationStateRef,
core: ClientCoreRef<SynchronizationClientCore<T>>,
light_verifier: U,
heavy_verifier: U,
) -> Arc<Self> {
Arc::new(SynchronizationClient {
verification_lock: Mutex::new(()),
shared_state: shared_state,
core: core,
verifier: verifier,
light_verifier: light_verifier,
heavy_verification_lock: Mutex::new(()),
heavy_verifier: heavy_verifier,
})
}
}

View File

@ -10,14 +10,14 @@ use message::types;
use message::common::{InventoryType, InventoryVector};
use miner::transaction_fee_rate;
use primitives::hash::H256;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
use synchronization_chain::{Chain, BlockState, TransactionState, BlockInsertionResult};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::ManagementWorker;
use synchronization_peers_tasks::PeersTasks;
use synchronization_verifier::{VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask};
use synchronization_verifier::{VerificationSink, HeadersVerificationSink, BlockVerificationSink,
TransactionVerificationSink, VerificationTask};
use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef};
use utils::{AverageSpeedMeter, MessageBlockHeadersProvider, OrphanBlocksPool, OrphanTransactionsPool, HashPosition};
use utils::{AverageSpeedMeter, OrphanBlocksPool, OrphanTransactionsPool, HashPosition};
#[cfg(test)] use synchronization_peers_tasks::{Information as PeersTasksInformation};
#[cfg(test)] use synchronization_chain::{Information as ChainInformation};
@ -67,7 +67,7 @@ pub trait ClientCore {
fn on_connect(&mut self, peer_index: PeerIndex);
fn on_disconnect(&mut self, peer_index: PeerIndex);
fn on_inventory(&self, peer_index: PeerIndex, message: types::Inv);
fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers);
fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) -> Option<Vec<IndexedBlockHeader>>;
fn on_block(&mut self, peer_index: PeerIndex, block: IndexedBlock) -> Option<VecDeque<IndexedBlock>>;
fn on_transaction(&mut self, peer_index: PeerIndex, transaction: IndexedTransaction) -> Option<VecDeque<IndexedTransaction>>;
fn on_notfound(&mut self, peer_index: PeerIndex, message: types::NotFound);
@ -105,10 +105,6 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
orphaned_blocks_pool: OrphanBlocksPool,
/// Orphaned transactions pool.
orphaned_transactions_pool: OrphanTransactionsPool,
/// Chain verifier
chain_verifier: Arc<ChainVerifier>,
/// Verify block headers?
verify_headers: bool,
/// Verifying blocks by peer
verifying_blocks_by_peer: HashMap<H256, PeerIndex>,
/// Verifying blocks futures
@ -166,16 +162,6 @@ enum AppendTransactionError {
Orphan(HashSet<H256>),
}
/// Blocks headers verification result
enum BlocksHeadersVerificationResult {
/// Skip these blocks headers
Skip,
/// Error during verification of header with given index
Error(usize),
/// Successful verification
Success,
}
impl State {
pub fn is_saturated(&self) -> bool {
match *self {
@ -259,11 +245,11 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Try to queue synchronization of unknown blocks when blocks headers are received.
fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) {
fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) -> Option<Vec<IndexedBlockHeader>> {
assert!(!message.headers.is_empty(), "This must be checked in incoming connection");
// transform to indexed headers
let mut headers: Vec<_> = message.headers.into_iter().map(IndexedBlockHeader::from_raw).collect();
let headers: Vec<_> = message.headers.into_iter().map(IndexedBlockHeader::from_raw).collect();
// update peers to select next tasks
self.peers_tasks.on_headers_received(peer_index);
@ -287,71 +273,85 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.peers.misbehaving(peer_index, "Too many failures.");
}
return;
return None;
}
// find first unknown header position
// optimization: normally, the first header will be unknown
let num_headers = headers.len();
let first_unknown_index = match self.chain.block_state(&header0.hash) {
BlockState::Unknown => 0,
_ => {
// optimization: if last header is known, then all headers are also known
let header_last = &headers[num_headers - 1];
match self.chain.block_state(&header_last.hash) {
BlockState::Unknown => 1 + headers.iter().skip(1)
.position(|header| self.chain.block_state(&header.hash) == BlockState::Unknown)
.expect("last header has UnknownState; we are searching for first unknown header; qed"),
// else all headers are known
_ => {
trace!(target: "sync", "Ignoring {} known headers from peer#{}", headers.len(), peer_index);
// but this peer is still useful for synchronization
self.peers_tasks.useful_peer(peer_index);
return;
},
}
}
};
let headers_in_message = headers.len();
let headers = self.find_unknown_headers(headers);
if headers.is_empty() {
trace!(target: "sync", "Ignoring {} known headers from peer#{}", headers_in_message, peer_index);
// but this peer is still useful for synchronization
self.peers_tasks.useful_peer(peer_index);
return None;
}
// validate blocks headers before scheduling
let last_known_hash = if first_unknown_index > 0 { headers[first_unknown_index - 1].hash.clone() } else { header0.raw.previous_header_hash.clone() };
let mut last_known_hash = headers[0].raw.previous_header_hash;
if self.config.close_connection_on_bad_block && self.chain.block_state(&last_known_hash) == BlockState::DeadEnd {
self.peers.misbehaving(peer_index, &format!("Provided after dead-end block {}", last_known_hash.to_reversed_str()));
return;
return None;
}
match self.verify_headers(peer_index, last_known_hash, &headers[first_unknown_index..num_headers]) {
BlocksHeadersVerificationResult::Error(error_index) => self.chain.mark_dead_end_block(&headers[first_unknown_index + error_index].hash),
BlocksHeadersVerificationResult::Skip => (),
BlocksHeadersVerificationResult::Success => {
// report progress
let num_new_headers = num_headers - first_unknown_index;
trace!(target: "sync", "New {} headers from peer#{}. First {:?}, last: {:?}",
num_new_headers,
for (header_index, header) in headers.iter().enumerate() {
// check that this header is direct child of previous header
if header.raw.previous_header_hash != last_known_hash {
self.peers.misbehaving(
peer_index,
headers[first_unknown_index].hash.to_reversed_str(),
headers[num_headers - 1].hash.to_reversed_str()
&format!(
"Neighbour headers in `headers` message are unlinked: Prev: {}, PrevLink: {}, Curr: {}",
last_known_hash.to_reversed_str(),
header.raw.previous_header_hash.to_reversed_str(),
header.hash.to_reversed_str(),
),
);
return None;
}
// prepare new headers array
let new_headers = headers.split_off(first_unknown_index);
self.chain.schedule_blocks_headers(new_headers);
// check that we do not know all blocks in range [first_unknown_index..]
// if we know some block => there has been verification error => all headers should be ignored
// see when_previous_block_verification_failed_fork_is_not_requested for details
match self.chain.block_state(&header.hash) {
BlockState::Unknown => (),
BlockState::DeadEnd if self.config.close_connection_on_bad_block => {
self.peers.misbehaving(
peer_index,
&format!(
"Provided dead-end block {:?}",
header.hash.to_reversed_str(),
),
);
return None;
},
block_state => {
trace!(
target: "sync",
"Ignoring {} headers from peer#{} - known ({:?}) header {} at the {}/{} ({}...{})",
headers.len(), peer_index, block_state, header.hash.to_reversed_str(), header_index,
headers.len(), headers[0].hash.to_reversed_str(),
headers[headers.len() - 1].hash.to_reversed_str());
self.peers_tasks.useful_peer(peer_index);
return None;
},
}
// switch to synchronization state
if !self.state.is_synchronizing() {
if self.chain.length_of_blocks_state(BlockState::Scheduled) +
self.chain.length_of_blocks_state(BlockState::Requested) == 1 {
self.switch_to_nearly_saturated_state();
} else {
self.switch_to_synchronization_state();
}
}
// these peers have supplied us with new headers => useful indeed
self.peers_tasks.useful_peer(peer_index);
// and execute tasks
self.execute_synchronization_tasks(None, None);
},
last_known_hash = header.hash;
}
// report progress
let num_new_headers = headers_in_message - headers.len();
trace!(target: "sync", "New {} headers from peer#{}. First {:?}, last: {:?}",
num_new_headers,
peer_index,
headers[0].hash.to_reversed_str(),
headers[headers.len() - 1].hash.to_reversed_str()
);
// peer has supplied us with new headers => useful indeed
self.peers_tasks.useful_peer(peer_index);
Some(headers)
}
fn on_block(&mut self, peer_index: PeerIndex, block: IndexedBlock) -> Option<VecDeque<IndexedBlock>> {
@ -427,8 +427,8 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
let blocks_hashes_to_forget: Vec<_> = blocks_to_verify.iter().map(|b| b.hash().clone()).collect();
self.chain.forget_blocks_leave_header(&blocks_hashes_to_forget);
// remember that we are verifying these blocks
let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|b| b.header.clone()).collect();
self.chain.verify_blocks(blocks_headers_to_verify);
let blocks_headers: Vec<_> = blocks_to_verify.iter().map(|b| b.header.clone()).collect();
self.chain.verify_blocks(blocks_headers);
// remember that we are verifying block from this peer
for verifying_block_hash in blocks_to_verify.iter().map(|b| b.hash().clone()) {
self.verifying_blocks_by_peer.insert(verifying_block_hash, peer_index);
@ -717,6 +717,16 @@ impl<T> CoreVerificationSink<T> where T: TaskExecutor {
impl<T> VerificationSink for CoreVerificationSink<T> where T: TaskExecutor {
}
impl<T> HeadersVerificationSink for CoreVerificationSink<T> where T: TaskExecutor {
fn on_headers_verification_success(&self, headers: Vec<IndexedBlockHeader>) {
self.core.lock().on_headers_verification_success(headers)
}
fn on_headers_verification_error(&self, peer: PeerIndex, error: String, hash: H256) {
self.core.lock().on_headers_verification_error(peer, error, hash)
}
}
impl<T> BlockVerificationSink for CoreVerificationSink<T> where T: TaskExecutor {
/// Process successful block verification
fn on_block_verification_success(&self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
@ -743,7 +753,7 @@ impl<T> TransactionVerificationSink for CoreVerificationSink<T> where T: TaskExe
impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
/// Create new synchronization client core
pub fn new(config: Config, shared_state: SynchronizationStateRef, peers: PeersRef, executor: Arc<T>, chain: Chain, chain_verifier: Arc<ChainVerifier>) -> ClientCoreRef<Self> {
pub fn new(config: Config, shared_state: SynchronizationStateRef, peers: PeersRef, executor: Arc<T>, chain: Chain) -> ClientCoreRef<Self> {
let sync = Arc::new(Mutex::new(
SynchronizationClientCore {
shared_state: shared_state,
@ -755,8 +765,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
chain: chain,
orphaned_blocks_pool: OrphanBlocksPool::new(),
orphaned_transactions_pool: OrphanTransactionsPool::new(),
chain_verifier: chain_verifier,
verify_headers: true,
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_futures: HashMap::new(),
verifying_transactions_sinks: HashMap::new(),
@ -820,12 +828,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
&mut self.orphaned_transactions_pool
}
/// Verify block headers or not?
#[cfg(test)]
pub fn set_verify_headers(&mut self, verify: bool) {
self.verify_headers = verify;
}
/// Print synchronization information
pub fn print_synchronization_information(&mut self) {
if let State::Synchronizing(timestamp, num_of_blocks) = self.state {
@ -857,56 +859,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Verify and select unknown headers for scheduling
fn verify_headers(&mut self, peer_index: PeerIndex, last_known_hash: H256, headers: &[IndexedBlockHeader]) -> BlocksHeadersVerificationResult {
// validate blocks headers before scheduling
let mut last_known_hash = &last_known_hash;
let mut headers_provider = MessageBlockHeadersProvider::new(&self.chain, self.chain.best_block_header().number);
for (header_index, header) in headers.iter().enumerate() {
// check that this header is direct child of previous header
if &header.raw.previous_header_hash != last_known_hash {
self.peers.misbehaving(peer_index, &format!("Neighbour headers in `headers` message are unlinked: Prev: {}, PrevLink: {}, Curr: {}",
last_known_hash.to_reversed_str(), header.raw.previous_header_hash.to_reversed_str(), header.hash.to_reversed_str()));
return BlocksHeadersVerificationResult::Skip;
}
// check that we do not know all blocks in range [first_unknown_index..]
// if we know some block => there has been verification error => all headers should be ignored
// see when_previous_block_verification_failed_fork_is_not_requested for details
match self.chain.block_state(&header.hash) {
BlockState::Unknown => (),
BlockState::DeadEnd if self.config.close_connection_on_bad_block => {
self.peers.misbehaving(peer_index, &format!("Provided dead-end block {:?}", header.hash.to_reversed_str()));
return BlocksHeadersVerificationResult::Skip;
},
block_state => {
trace!(target: "sync", "Ignoring {} headers from peer#{} - known ({:?}) header {} at the {}/{} ({}...{})",
headers.len(), peer_index, block_state, header.hash.to_reversed_str(), header_index, headers.len(),
headers[0].hash.to_reversed_str(), headers[headers.len() - 1].hash.to_reversed_str());
self.peers_tasks.useful_peer(peer_index);
return BlocksHeadersVerificationResult::Skip;
},
}
// verify header
if self.verify_headers {
if let Err(error) = self.chain_verifier.verify_block_header(&headers_provider, &header.hash, &header.raw) {
if self.config.close_connection_on_bad_block {
self.peers.misbehaving(peer_index, &format!("Error verifying header {} from `headers`: {:?}", header.hash.to_reversed_str(), error));
} else {
warn!(target: "sync", "Error verifying header {} from `headers` message: {:?}", header.hash.to_reversed_str(), error);
}
return BlocksHeadersVerificationResult::Error(header_index);
}
}
last_known_hash = &header.hash;
headers_provider.append_header(header.hash.clone(), header.clone());
}
BlocksHeadersVerificationResult::Success
}
/// Process new peer transaction
fn process_peer_transaction(&mut self, _peer_index: Option<PeerIndex>, transaction: IndexedTransaction, relay: bool) -> Option<VecDeque<IndexedTransaction>> {
match self.try_append_transaction(transaction.clone(), relay) {
@ -1052,6 +1004,71 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
fn find_unknown_headers(&self, mut headers: Vec<IndexedBlockHeader>) -> Vec<IndexedBlockHeader> {
// find first unknown header position
// optimization: normally, the first header will be unknown
let num_headers = headers.len();
let first_unknown_index = match self.chain.block_state(&headers[0].hash) {
BlockState::Unknown => 0,
_ => {
// optimization: if last header is known, then all headers are also known
let header_last = &headers[num_headers - 1];
match self.chain.block_state(&header_last.hash) {
BlockState::Unknown => 1 + headers.iter().skip(1)
.position(|header| self.chain.block_state(&header.hash) == BlockState::Unknown)
.expect("last header has UnknownState; we are searching for first unknown header; qed"),
// else all headers are known
_ => headers.len(),
}
}
};
if first_unknown_index == 0 { headers } else { headers.split_off(first_unknown_index) }
}
fn on_headers_verification_success(&mut self, headers: Vec<IndexedBlockHeader>) {
let headers = self.find_unknown_headers(headers);
if headers.is_empty() {
return;
}
self.chain.schedule_blocks_headers(headers);
// switch to synchronization state
if !self.state.is_synchronizing() {
if self.chain.length_of_blocks_state(BlockState::Scheduled) +
self.chain.length_of_blocks_state(BlockState::Requested) == 1 {
self.switch_to_nearly_saturated_state();
} else {
self.switch_to_synchronization_state();
}
}
self.execute_synchronization_tasks(None, None);
}
fn on_headers_verification_error(&mut self, peer: PeerIndex, error: String, hash: H256) {
if self.config.close_connection_on_bad_block {
self.peers.misbehaving(
peer,
&format!(
"Error verifying header {} from `headers`: {:?}",
hash.to_reversed_str(),
error,
),
);
} else {
warn!(
target: "sync",
"Error verifying header {} from `headers` message: {:?}",
hash.to_reversed_str(),
error,
);
}
self.chain.mark_dead_end_block(&hash);
self.execute_synchronization_tasks(None, None);
}
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
// update block processing speed
self.block_speed_meter.checkpoint();
@ -1301,17 +1318,16 @@ pub mod tests {
let config = Config { close_connection_on_bad_block: true };
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), ConsensusParams::new(Network::Unitest)));
let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier.clone());
{
client_core.lock().set_verify_headers(false);
}
let mut verifier = verifier.unwrap_or_default();
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
verifier.set_storage(storage);
verifier.set_memory_pool(memory_pool);
verifier.set_verifier(chain_verifier);
let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain);
let mut light_verifier = DummyVerifier::default();
light_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
let mut heavy_verifier = verifier.unwrap_or_default();
heavy_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
heavy_verifier.set_storage(storage);
heavy_verifier.set_memory_pool(memory_pool);
heavy_verifier.set_verifier(chain_verifier);
let client = SynchronizationClient::new(sync_state, client_core.clone(), verifier);
let client = SynchronizationClient::new(sync_state, client_core.clone(), light_verifier, heavy_verifier);
(executor, client_core, client)
}
@ -2125,7 +2141,7 @@ pub mod tests {
chain.mark_dead_end_block(&b1.hash());
}
core.lock().set_verify_headers(true);
// core.lock().set_verify_headers(true);
core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));

View File

@ -5,15 +5,23 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use parking_lot::Mutex;
use time::get_time;
use chain::{IndexedBlock, IndexedTransaction};
use chain::{IndexedBlockHeader, IndexedBlock, IndexedTransaction};
use network::ConsensusParams;
use primitives::hash::H256;
use verification::{BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify,
Error as VerificationError, VerificationLevel};
use types::{BlockHeight, StorageRef, MemoryPoolRef};
use types::{PeerIndex, BlockHeight, StorageRef, MemoryPoolRef};
use utils::MemoryPoolTransactionOutputProvider;
use VerificationParameters;
/// Headers verification events sink
pub trait HeadersVerificationSink : Send + Sync + 'static {
/// When headers verification has completed successfully.
fn on_headers_verification_success(&self, headers: Vec<IndexedBlockHeader>);
/// When headers verification has failed.
fn on_headers_verification_error(&self, peer: PeerIndex, error: String, hash: H256);
}
/// Block verification events sink
pub trait BlockVerificationSink : Send + Sync + 'static {
/// When block verification has completed successfully.
@ -31,12 +39,14 @@ pub trait TransactionVerificationSink : Send + Sync + 'static {
}
/// Verification events sink
pub trait VerificationSink : BlockVerificationSink + TransactionVerificationSink {
pub trait VerificationSink : HeadersVerificationSink + BlockVerificationSink + TransactionVerificationSink {
}
/// Verification thread tasks
#[derive(Debug)]
pub enum VerificationTask {
/// Verify headers
VerifyHeaders(PeerIndex, Vec<IndexedBlockHeader>),
/// Verify single block
VerifyBlock(IndexedBlock),
/// Verify single transaction
@ -47,6 +57,8 @@ pub enum VerificationTask {
/// Synchronization verifier
pub trait Verifier : Send + Sync + 'static {
/// Verify headers
fn verify_headers(&self, peer: PeerIndex, headers: Vec<IndexedBlockHeader>);
/// Verify block
fn verify_block(&self, block: IndexedBlock);
/// Verify transaction
@ -67,8 +79,10 @@ pub struct ChainVerifierWrapper {
pub verifier: Arc<ChainVerifier>,
/// Verification parameters.
verification_params: VerificationParameters,
/// Is verification edge passed.
/// True if we have passed verification edge && full verification is required.
pub enforce_full_verification: AtomicBool,
/// True if we need to actually verify headers.
verify_headers: bool,
}
impl ChainVerifierWrapper {
@ -79,6 +93,15 @@ impl ChainVerifierWrapper {
verifier: verifier,
verification_params: verification_params,
enforce_full_verification: enforce_full_verification,
verify_headers: false,
}
}
/// Verify header.
pub fn verify_block_header(&self, header: &IndexedBlockHeader) -> Result<(), VerificationError> {
match self.verify_headers {
true => self.verifier.verify_block_header(header),
false => Ok(()),
}
}
@ -138,7 +161,13 @@ impl AsyncVerifier {
}
/// Execute single verification task
pub fn execute_single_task<T: VerificationSink>(sink: &Arc<T>, storage: &StorageRef, memory_pool: &MemoryPoolRef, verifier: &ChainVerifierWrapper, task: VerificationTask) -> bool {
pub fn execute_single_task<T: VerificationSink>(
sink: &Arc<T>,
storage: &StorageRef,
memory_pool: &MemoryPoolRef,
verifier: &ChainVerifierWrapper,
task: VerificationTask,
) -> bool {
// block verification && insertion can lead to reorganization
// => transactions from decanonized blocks should be put back to the MemoryPool
// => they must be verified again
@ -148,6 +177,15 @@ impl AsyncVerifier {
while let Some(task) = tasks_queue.pop_front() {
match task {
VerificationTask::VerifyHeaders(peer, headers) => {
let result = headers.iter()
.try_for_each(|header| verifier.verify_block_header(header)
.map_err(|error| (error, header.hash)));
match result {
Ok(_) => sink.on_headers_verification_success(headers),
Err((error, hash)) => sink.on_headers_verification_error(peer, format!("{:?}", error), hash),
}
},
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify_block(&block) {
@ -185,7 +223,6 @@ impl AsyncVerifier {
}
}
impl Drop for AsyncVerifier {
fn drop(&mut self) {
if let Some(join_handle) = self.verification_worker_thread.take() {
@ -200,6 +237,13 @@ impl Drop for AsyncVerifier {
}
impl Verifier for AsyncVerifier {
/// Verify headers
fn verify_headers(&self, peer: PeerIndex, headers: Vec<IndexedBlockHeader>) {
self.verification_work_sender.lock()
.send(VerificationTask::VerifyHeaders(peer, headers))
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
}
/// Verify block
fn verify_block(&self, block: IndexedBlock) {
self.verification_work_sender.lock()
@ -236,6 +280,11 @@ impl<T> SyncVerifier<T> where T: VerificationSink {
}
impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
/// Verify headers
fn verify_headers(&self, _peer: PeerIndex, _headers: Vec<IndexedBlockHeader>) {
unreachable!("SyncVerifier is used only for blocks verification")
}
/// Verify block
fn verify_block(&self, block: IndexedBlock) {
match self.verifier.verify_block(&block) {
@ -251,7 +300,7 @@ impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
/// Verify transaction
fn verify_transaction(&self, _height: BlockHeight, _transaction: IndexedTransaction) {
unimplemented!() // sync verifier is currently only used for blocks verification
unreachable!("SyncVerifier is used only for blocks verification")
}
}
@ -269,9 +318,10 @@ pub mod tests {
use synchronization_client_core::CoreVerificationSink;
use synchronization_executor::tests::DummyTaskExecutor;
use primitives::hash::H256;
use chain::{IndexedBlock, IndexedTransaction};
use super::{Verifier, BlockVerificationSink, TransactionVerificationSink, AsyncVerifier, VerificationTask, ChainVerifierWrapper};
use types::{BlockHeight, StorageRef, MemoryPoolRef};
use chain::{IndexedBlockHeader, IndexedBlock, IndexedTransaction};
use super::{Verifier, HeadersVerificationSink, BlockVerificationSink, TransactionVerificationSink,
AsyncVerifier, VerificationTask, ChainVerifierWrapper};
use types::{PeerIndex, BlockHeight, StorageRef, MemoryPoolRef};
use VerificationParameters;
#[derive(Default)]
@ -314,6 +364,13 @@ pub mod tests {
}
impl Verifier for DummyVerifier {
fn verify_headers(&self, _peer: PeerIndex, headers: Vec<IndexedBlockHeader>) {
match self.sink {
Some(ref sink) => sink.on_headers_verification_success(headers),
_ => (),
}
}
fn verify_block(&self, block: IndexedBlock) {
match self.sink {
Some(ref sink) => match self.errors.get(&block.hash()) {

View File

@ -1,84 +0,0 @@
use std::collections::HashMap;
use chain::IndexedBlockHeader;
use storage::{BlockRef, BlockHeaderProvider};
use primitives::bytes::Bytes;
use primitives::hash::H256;
/// Block headers provider from `headers` message
pub struct MessageBlockHeadersProvider<'a> {
/// Synchronization chain headers provider
chain_provider: &'a BlockHeaderProvider,
/// headers offset
first_header_number: u32,
/// headers by hash
headers: HashMap<H256, IndexedBlockHeader>,
/// headers by order
headers_order: Vec<H256>,
}
impl<'a> MessageBlockHeadersProvider<'a> {
pub fn new(chain_provider: &'a BlockHeaderProvider, best_block_header_height: u32) -> Self {
MessageBlockHeadersProvider {
chain_provider: chain_provider,
first_header_number: best_block_header_height + 1,
headers: HashMap::new(),
headers_order: Vec::new(),
}
}
pub fn append_header(&mut self, hash: H256, header: IndexedBlockHeader) {
self.headers.insert(hash.clone(), header);
self.headers_order.push(hash);
}
}
impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> {
fn block_header_bytes(&self, block_ref: BlockRef) -> Option<Bytes> {
use ser::serialize;
self.block_header(block_ref).map(|h| serialize(&h.raw))
}
fn block_header(&self, block_ref: BlockRef) -> Option<IndexedBlockHeader> {
self.chain_provider.block_header(block_ref.clone())
.or_else(move || match block_ref {
BlockRef::Hash(h) => self.headers.get(&h).cloned(),
BlockRef::Number(n) => if n >= self.first_header_number && n - self.first_header_number < self.headers_order.len() as u32 {
let header_hash = &self.headers_order[(n - self.first_header_number) as usize];
Some(self.headers[header_hash].clone())
} else {
None
},
})
}
}
#[cfg(test)]
mod tests {
extern crate test_data;
use storage::{AsSubstore, BlockHeaderProvider, BlockRef};
use db::BlockChainDatabase;
use primitives::hash::H256;
use super::MessageBlockHeadersProvider;
#[test]
fn test_message_block_headers_provider() {
let storage = BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]);
let storage_provider = storage.as_block_header_provider();
let mut headers_provider = MessageBlockHeadersProvider::new(storage_provider, 0);
assert_eq!(headers_provider.block_header(BlockRef::Hash(test_data::genesis().hash())), Some(test_data::genesis().block_header.into()));
assert_eq!(headers_provider.block_header(BlockRef::Number(0)), Some(test_data::genesis().block_header.into()));
assert_eq!(headers_provider.block_header(BlockRef::Hash(H256::from(1))), None);
assert_eq!(headers_provider.block_header(BlockRef::Number(1)), None);
headers_provider.append_header(test_data::block_h1().hash(), test_data::block_h1().block_header.into());
assert_eq!(headers_provider.block_header(BlockRef::Hash(test_data::genesis().hash())), Some(test_data::genesis().block_header.into()));
assert_eq!(headers_provider.block_header(BlockRef::Number(0)), Some(test_data::genesis().block_header.into()));
assert_eq!(headers_provider.block_header(BlockRef::Hash(test_data::block_h1().hash())), Some(test_data::block_h1().block_header.into()));
assert_eq!(headers_provider.block_header(BlockRef::Number(1)), Some(test_data::block_h1().block_header.into()));
assert_eq!(headers_provider.block_header(BlockRef::Hash(H256::from(1))), None);
assert_eq!(headers_provider.block_header(BlockRef::Number(2)), None);
}
}

View File

@ -6,7 +6,6 @@ mod fee_rate_filter;
mod hash_queue;
mod known_hash_filter;
mod memory_pool_transaction_provider;
mod message_block_headers_provider;
mod orphan_blocks_pool;
mod orphan_transactions_pool;
mod partial_merkle_tree;
@ -20,7 +19,6 @@ pub use self::fee_rate_filter::FeeRateFilter;
pub use self::hash_queue::{HashQueue, HashQueueChain, HashPosition};
pub use self::known_hash_filter::{KnownHashType, KnownHashFilter};
pub use self::memory_pool_transaction_provider::MemoryPoolTransactionOutputProvider;
pub use self::message_block_headers_provider::MessageBlockHeadersProvider;
pub use self::orphan_blocks_pool::OrphanBlocksPool;
pub use self::orphan_transactions_pool::{OrphanTransactionsPool, OrphanTransaction};
pub use self::partial_merkle_tree::{PartialMerkleTree, build_partial_merkle_tree};

View File

@ -1,7 +1,6 @@
//! Bitcoin chain verifier
use hash::H256;
use chain::{IndexedBlock, IndexedBlockHeader, BlockHeader, IndexedTransaction};
use chain::{IndexedBlock, IndexedBlockHeader, IndexedTransaction};
use storage::{SharedStore, TransactionOutputProvider, BlockHeaderProvider, BlockOrigin,
DuplexTransactionOutputProvider, NoopStore};
use network::ConsensusParams;
@ -85,15 +84,10 @@ impl BackwardsCompatibleChainVerifier {
pub fn verify_block_header(
&self,
_block_header_provider: &BlockHeaderProvider,
hash: &H256,
header: &BlockHeader
header: &IndexedBlockHeader,
) -> Result<(), Error> {
// let's do only preverifcation
// TODO: full verification
let current_time = ::time::get_time().sec as u32;
let header = IndexedBlockHeader::new(hash.clone(), header.clone());
let header_verifier = HeaderVerifier::new(&header, &self.consensus, current_time);
let header_verifier = HeaderVerifier::new(header, &self.consensus, current_time);
header_verifier.check()
}