Merge pull request #258 from ethcore/sync_close_connection_after_verify_error

Close connection with peer when block verification failed with error
This commit is contained in:
Svyatoslav Nikolsky 2016-12-05 13:38:00 +03:00 committed by GitHub
commit 36db70eb86
8 changed files with 304 additions and 32 deletions

View File

@ -3,6 +3,7 @@ use parking_lot::Mutex;
use message::{Payload, Message};
use p2p::Context;
use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses};
use futures::{lazy, finished};
pub struct PeerContext {
context: Arc<Context>,
@ -104,6 +105,17 @@ impl PeerContext {
}
}
/// Closes this context
pub fn close(&self) {
let context = self.context.clone();
let peer_id = self.info.id;
let close = lazy(move || {
context.close_channel(peer_id);
finished::<(), ()>(())
});
self.context.spawn(close);
}
pub fn info(&self) -> &PeerInfo {
&self.info
}

View File

@ -60,6 +60,7 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_block_txn(&self, message: &types::BlockTxn);
fn send_notfound(&self, message: &types::NotFound);
fn ignored(&self, id: u32);
fn close(&self);
}
struct OutboundSync {
@ -162,6 +163,10 @@ impl OutboundSyncConnection for OutboundSync {
fn ignored(&self, id: u32) {
self.context.ignore_response(id);
}
fn close(&self) {
self.context.close()
}
}
pub struct SyncProtocol {

View File

@ -74,10 +74,17 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
use synchronization_verifier::AsyncVerifier;
let sync_client_config = SynchronizationConfig {
// during regtests, peer is providing us with bad blocks => we shouldn't close connection because of this
close_connection_on_bad_block: network != Magic::Regtest,
// TODO: remove me
threads_num: 4,
};
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network);
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), network);
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));

View File

@ -305,6 +305,7 @@ mod tests {
fn send_block_txn(&self, _message: &types::BlockTxn) {}
fn send_notfound(&self, _message: &types::NotFound) {}
fn ignored(&self, _id: u32) {}
fn close(&self) {}
}
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
@ -313,7 +314,7 @@ mod tests {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { threads_num: 1 };
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Mainnet);
let mut verifier = DummyVerifier::default();
verifier.set_sink(client_core.clone());

View File

@ -1,6 +1,6 @@
use std::fmt;
use std::sync::Arc;
use std::collections::VecDeque;
use std::collections::{VecDeque, HashSet};
use linked_hash_map::LinkedHashMap;
use parking_lot::RwLock;
use chain::{BlockHeader, Transaction};
@ -55,6 +55,8 @@ pub enum BlockState {
Verifying,
/// In storage
Stored,
/// This block has been marked as dead-end block
DeadEnd,
}
/// Transactions synchronization state
@ -101,6 +103,8 @@ pub enum HeadersIntersection {
DbAllBlocksKnown,
/// 3.4: No intersection with in-memory queue && has intersection with db && some blocks are not yet stored in db
DbForkNewBlocks(usize),
/// Dead-end blocks are starting from given index
DeadEnd(usize),
}
/// Blockchain from synchroniation point of view, consisting of:
@ -123,6 +127,8 @@ pub struct Chain {
verifying_transactions: LinkedHashMap<H256, Transaction>,
/// Transactions memory pool
memory_pool: MemoryPool,
/// Blocks that have been marked as dead-ends
dead_end_blocks: HashSet<H256>,
}
impl BlockState {
@ -163,6 +169,7 @@ impl Chain {
headers_chain: BestHeadersChain::new(best_storage_block_hash),
verifying_transactions: LinkedHashMap::new(),
memory_pool: MemoryPool::new(),
dead_end_blocks: HashSet::new(),
}
}
@ -276,7 +283,11 @@ impl Chain {
None => if self.storage.contains_block(db::BlockRef::Hash(hash.clone())) {
BlockState::Stored
} else {
BlockState::Unknown
if self.dead_end_blocks.contains(hash) {
BlockState::DeadEnd
} else {
BlockState::Unknown
}
},
}
}
@ -334,6 +345,11 @@ impl Chain {
requested
}
/// Mark this block as dead end, so these tasks won't be synchronized
pub fn mark_dead_end_block(&mut self, hash: &H256) {
self.dead_end_blocks.insert(hash.clone());
}
/// Insert new best block to storage
pub fn insert_best_block(&mut self, hash: H256, block: &IndexedBlock) -> Result<BlockInsertionResult, db::Error> {
let is_appending_to_main_branch = self.best_storage_block.hash == block.header().previous_header_hash;
@ -516,6 +532,10 @@ impl Chain {
state => (true, state),
};
match first_state {
// if first block of inventory is dead-end, then all other blocks are also dead-end blocks
BlockState::DeadEnd => {
HeadersIntersection::DeadEnd(0)
},
// if first block of inventory is unknown && its parent is unknonw => all other blocks are also unknown
BlockState::Unknown => {
HeadersIntersection::NoKnownBlocks(0)
@ -542,14 +562,18 @@ impl Chain {
}
},
// if first block is known && last block is unknown => intersection with queue or with db
BlockState::Unknown if is_first_known => {
BlockState::Unknown | BlockState::DeadEnd if is_first_known => {
// find last known block
let mut previous_state = first_block_state;
for (index, hash) in hashes.iter().enumerate().take(hashes_len).skip(1) {
let state = self.block_state(hash);
if state == BlockState::Unknown {
if state == BlockState::Unknown || state == BlockState::DeadEnd {
// if state is dead end => there are no useful blocks
if state == BlockState::DeadEnd {
return HeadersIntersection::DeadEnd(index);
}
// previous block is stored => fork from stored block
if previous_state == BlockState::Stored {
else if previous_state == BlockState::Stored {
return HeadersIntersection::DbForkNewBlocks(index);
}
// previous block is best block => no fork
@ -1194,4 +1218,49 @@ mod tests {
// => tx2 is removed from memory pool, but tx3 remains
assert_eq!(chain.information().transactions.transactions_count, 1);
}
#[test]
fn chain_dead_end() {
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
let blocks = test_data::build_n_empty_blocks_from(5, 0, &test_data::genesis().block_header);
let headers: Vec<_> = blocks.iter().map(|b| b.block_header.clone()).collect();
let hashes: Vec<_> = headers.iter().map(|h| h.hash()).collect();
chain.insert_best_block(blocks[0].hash(), &blocks[0].clone().into()).expect("no error");
chain.insert_best_block(blocks[1].hash(), &blocks[1].clone().into()).expect("no error");
chain.mark_dead_end_block(&blocks[2].hash());
assert_eq!(chain.intersect_with_blocks_headers(&vec![
hashes[0].clone(),
hashes[1].clone(),
hashes[2].clone(),
hashes[3].clone(),
hashes[4].clone(),
], &vec![
headers[0].clone(),
headers[1].clone(),
headers[2].clone(),
headers[3].clone(),
headers[4].clone(),
]), HeadersIntersection::DeadEnd(2));
assert_eq!(chain.intersect_with_blocks_headers(&vec![
hashes[2].clone(),
hashes[3].clone(),
hashes[4].clone(),
], &vec![
headers[2].clone(),
headers[3].clone(),
headers[4].clone(),
]), HeadersIntersection::DeadEnd(0));
assert_eq!(chain.intersect_with_blocks_headers(&vec![
hashes[3].clone(),
hashes[4].clone(),
], &vec![
headers[3].clone(),
headers[4].clone(),
]), HeadersIntersection::DeadEnd(0));
}
}

View File

@ -242,6 +242,8 @@ pub trait ClientCore : VerificationSink {
/// Synchronization client configuration options.
#[derive(Debug)]
pub struct Config {
/// If true, connection to peer who has provided us with bad block is closed
pub close_connection_on_bad_block: bool,
/// Number of threads to allocate in synchronization CpuPool.
pub threads_num: usize,
}
@ -310,6 +312,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
block_speed_meter: AverageSpeedMeter,
/// Block synchronization speed meter
sync_speed_meter: AverageSpeedMeter,
/// Configuration
config: Config,
}
/// Block headers provider from `headers` message
@ -336,14 +340,6 @@ struct AverageSpeedMeter {
last_timestamp: Option<f64>,
}
impl Config {
pub fn new() -> Self {
Config {
threads_num: 4,
}
}
}
impl FilteredInventory {
#[cfg(test)]
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
@ -603,7 +599,12 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
let unknown_blocks_hashes: Vec<_> = {
let chain = self.chain.read();
blocks_hashes.into_iter()
.filter(|h| chain.block_state(h) == BlockState::Unknown)
.filter(|h| {
// if we haven't closed connection after receiving dead-end block
// => also process dead-end blocks
let block_state = chain.block_state(h);
block_state == BlockState::Unknown || (block_state == BlockState::DeadEnd && !self.config.close_connection_on_bad_block)
})
.filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h))
.collect()
};
@ -1016,12 +1017,23 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
self.do_not_relay.remove(hash);
// close connection with this peer
if let Some(peer_index) = self.verifying_blocks_by_peer.get(hash) {
warn!(target: "sync", "Peer#{} has provided us with wrong block {:?}", peer_index, hash.to_reversed_str());
if self.config.close_connection_on_bad_block {
self.executor.lock().execute(Task::Close(*peer_index));
}
}
{
let mut chain = self.chain.write();
// forget for this block and all its children
// headers are also removed as they all are invalid
chain.forget_block_with_children(hash);
// mark failed block as dead end (this branch won't be synchronized)
chain.mark_dead_end_block(hash);
}
// awake threads, waiting for this block insertion
@ -1094,6 +1106,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
do_not_relay: HashSet::new(),
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
config: config,
}
));
@ -1258,7 +1271,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
assert_eq!(hashes.len(), headers.len());
let mut chain = self.chain.write();
match chain.intersect_with_blocks_headers(&hashes, &headers) {
let intersection_result = chain.intersect_with_blocks_headers(&hashes, &headers);
match intersection_result {
HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => {
warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index);
},
@ -1277,11 +1291,35 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
HeadersIntersection::InMemoryMainNewBlocks(new_block_index)
| HeadersIntersection::InMemoryForkNewBlocks(new_block_index)
| HeadersIntersection::DbForkNewBlocks(new_block_index)
| HeadersIntersection::NoKnownBlocks(new_block_index) => {
| HeadersIntersection::NoKnownBlocks(new_block_index)
| HeadersIntersection::DeadEnd(new_block_index) => {
if let HeadersIntersection::DeadEnd(dead_block_index) = intersection_result {
warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, hashes[dead_block_index].to_reversed_str());
if self.config.close_connection_on_bad_block {
self.executor.lock().execute(Task::Close(peer_index));
return;
}
}
// check that we do not know all blocks in range [new_block_index..]
// if we know some block => there has been verification error => all headers should be ignored
// see when_previous_block_verification_failed_fork_is_not_requested for details
if hashes.iter().skip(new_block_index).any(|h| chain.block_state(h) != BlockState::Unknown) {
if hashes.iter().skip(new_block_index).any(|h| {
let block_state = chain.block_state(h);
match block_state {
BlockState::Unknown => false,
BlockState::DeadEnd => {
warn!(target: "sync", "Peer#{} has provided us with blocks leading to dead-end block {:?}", peer_index, h.to_reversed_str());
if self.config.close_connection_on_bad_block {
self.executor.lock().execute(Task::Close(peer_index));
true
} else {
false
}
},
_ => true,
}
}) {
return;
}
@ -1318,15 +1356,32 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
// prepare list of blocks to verify + make all required changes to the chain
let mut result: Option<VecDeque<(H256, IndexedBlock)>> = None;
let mut chain = self.chain.write();
match chain.block_state(&block_hash) {
let block_state = chain.block_state(&block_hash);
match block_state {
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.useful_peer(peer_index);
},
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested | BlockState::DeadEnd => {
if block_state == BlockState::DeadEnd {
warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str());
if self.config.close_connection_on_bad_block {
self.executor.lock().execute(Task::Close(peer_index));
}
}
// check parent block state
match chain.block_state(&block.header().previous_header_hash) {
BlockState::Unknown => {
let parent_block_state = chain.block_state(&block.header().previous_header_hash);
match parent_block_state {
BlockState::Unknown | BlockState::DeadEnd => {
if parent_block_state == BlockState::DeadEnd {
warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str());
if self.config.close_connection_on_bad_block {
self.executor.lock().execute(Task::Close(peer_index));
return result;
}
}
if self.state.is_synchronizing() {
// when synchronizing, we tend to receive all blocks in-order
trace!(
@ -1648,7 +1703,7 @@ pub mod tests {
};
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
let executor = DummyTaskExecutor::new();
let config = Config { threads_num: 1 };
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Testnet);
{
@ -2605,4 +2660,85 @@ pub mod tests {
assert_eq!(headers_provider.block_header(db::BlockRef::Hash(H256::from(1))), None);
assert_eq!(headers_provider.block_header(db::BlockRef::Number(2)), None);
}
#[test]
fn collection_closed_on_block_verification_error() {
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
// simulate verification error during b0 verification
let mut dummy_verifier = DummyVerifier::default();
dummy_verifier.error_when_verifying(b0.hash(), "simulated");
let (_, _, executor, _, sync) = create_sync(None, Some(dummy_verifier));
sync.lock().on_peer_block(0, b0.into());
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::Close(0)]);
}
#[test]
fn collection_closed_on_begin_dead_end_block_header() {
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let (_, _, executor, chain, sync) = create_sync(None, None);
{
chain.write().mark_dead_end_block(&b0.hash());
}
sync.lock().on_new_blocks_headers(0, vec![b0.block_header.clone(), b1.block_header.clone(), b2.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::Close(0)]);
}
#[test]
fn collection_closed_on_in_middle_dead_end_block_header() {
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let (_, _, executor, chain, sync) = create_sync(None, None);
{
chain.write().mark_dead_end_block(&b1.hash());
}
sync.lock().on_new_blocks_headers(0, vec![b0.block_header.clone(), b1.block_header.clone(), b2.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::Close(0)]);
}
#[test]
fn collection_closed_on_providing_dead_end_block() {
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let (_, _, executor, chain, sync) = create_sync(None, None);
{
chain.write().mark_dead_end_block(&b0.hash());
}
sync.lock().on_peer_block(0, b0.into());
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::Close(0)]);
}
#[test]
fn collection_closed_on_providing_child_dead_end_block() {
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let (_, _, executor, chain, sync) = create_sync(None, None);
{
chain.write().mark_dead_end_block(&b0.hash());
}
sync.lock().on_peer_block(0, b1.into());
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::Close(0)]);
}
}

View File

@ -46,6 +46,8 @@ pub enum Task {
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>),
/// Notify io about ignored request
Ignore(usize, u32),
/// Close connection with this peer
Close(usize),
}
/// Synchronization tasks executor
@ -56,6 +58,28 @@ pub struct LocalSynchronizationTaskExecutor {
chain: ChainRef,
}
impl Task {
#[cfg(test)]
pub fn peer_index(&self) -> usize {
match *self {
Task::RequestBlocks(peer_index, _) => peer_index,
Task::RequestBlocksHeaders(peer_index) => peer_index,
Task::RequestTransactions(peer_index, _) => peer_index,
Task::RequestMemoryPool(peer_index) => peer_index,
Task::SendBlock(peer_index, _) => peer_index,
Task::SendMerkleBlock(peer_index, _) => peer_index,
Task::SendTransaction(peer_index, _) => peer_index,
Task::SendBlockTxn(peer_index, _, _) => peer_index,
Task::SendNotFound(peer_index, _) => peer_index,
Task::SendInventory(peer_index, _) => peer_index,
Task::SendHeaders(peer_index, _, _) => peer_index,
Task::SendCompactBlocks(peer_index, _) => peer_index,
Task::Ignore(peer_index, _) => peer_index,
Task::Close(peer_index) => peer_index,
}
}
}
impl LocalSynchronizationTaskExecutor {
pub fn new(chain: ChainRef) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(LocalSynchronizationTaskExecutor {
@ -215,6 +239,12 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.ignored(id);
}
},
Task::Close(peer_index) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Closing request with peer#{}", peer_index);
connection.close();
}
},
}
}
}
@ -228,10 +258,12 @@ pub mod tests {
use parking_lot::{Mutex, Condvar};
use local_node::PeersConnections;
use p2p::OutboundSyncConnectionRef;
use std::collections::HashSet;
pub struct DummyTaskExecutor {
tasks: Vec<Task>,
waiter: Arc<Condvar>,
closed: HashSet<usize>,
}
impl DummyTaskExecutor {
@ -239,6 +271,7 @@ pub mod tests {
Arc::new(Mutex::new(DummyTaskExecutor {
tasks: Vec::new(),
waiter: Arc::new(Condvar::new()),
closed: HashSet::new(),
}))
}
@ -267,6 +300,13 @@ pub mod tests {
impl TaskExecutor for DummyTaskExecutor {
fn execute(&mut self, task: Task) {
match task {
Task::Close(id) => {
self.closed.insert(id);
()
},
_ => if self.closed.contains(&task.peer_index()) { return },
}
self.tasks.push(task);
self.waiter.notify_one();
}

View File

@ -280,11 +280,10 @@ impl SynchronizationServer {
},
// `getblocktxn` => `blocktxn`
ServerTask::ServeGetBlockTxn(block_hash, indexes) => {
let transactions = {
let (close, transactions) = {
let chain = chain.read();
let storage = chain.storage();
if let Some(block) = storage.block(db::BlockRef::Hash(block_hash.clone())) {
let requested_len = indexes.len();
let transactions_len = block.transactions.len();
let mut read_indexes = HashSet::new();
@ -302,20 +301,23 @@ impl SynchronizationServer {
.map(Option::unwrap) // take_while above
.collect();
if transactions.len() == requested_len {
Some(transactions)
(false, Some(transactions))
} else {
// TODO: malformed
None
warn!(target: "sync", "Closing connection with peer#{} as it has requested unknown block_txn from block {:?}", peer_index, block_hash.to_reversed_str());
(true, None)
}
} else {
// TODO: else malformed
None
warn!(target: "sync", "Closing connection with peer#{} as it has requested block_txn from unknown block {:?}", peer_index, block_hash.to_reversed_str());
(true, None)
}
};
if let Some(transactions) = transactions {
trace!(target: "sync", "Going to respond with {} blocktxn transactions to peer#{}", transactions.len(), peer_index);
executor.lock().execute(Task::SendBlockTxn(peer_index, block_hash, transactions));
}
if close {
executor.lock().execute(Task::Close(peer_index));
}
},
// `mempool` => `inventory`
ServerTask::ServeMempool => {
@ -766,7 +768,7 @@ pub mod tests {
server.serve_get_block_txn(0, test_data::genesis().hash(), vec![1]).map(|t| server.add_task(0, t));
// server responds with transactions
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Close(0)]);
}
#[test]