diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 2a0ae7c4..8dd68875 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -74,10 +74,17 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig}; use synchronization_verifier::AsyncVerifier; + let sync_client_config = SynchronizationConfig { + // during regtests, peer is providing us with bad blocks => we shouldn't close connection because of this + close_connection_on_bad_block: network != Magic::Regtest, + // TODO: remove me + threads_num: 4, + }; + let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); - let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network); + let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), network); let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone()); let sync_client = SynchronizationClient::new(sync_client_core, verifier); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 749f1512..8849b678 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -242,6 +242,8 @@ pub trait ClientCore : VerificationSink { /// Synchronization client configuration options. #[derive(Debug)] pub struct Config { + /// If true, connection to peer who has provided us with bad block is closed + pub close_connection_on_bad_block: bool, /// Number of threads to allocate in synchronization CpuPool. pub threads_num: usize, } @@ -310,6 +312,8 @@ pub struct SynchronizationClientCore { block_speed_meter: AverageSpeedMeter, /// Block synchronization speed meter sync_speed_meter: AverageSpeedMeter, + /// Configuration + config: Config, } /// Block headers provider from `headers` message @@ -337,8 +341,10 @@ struct AverageSpeedMeter { } impl Config { + #[cfg(test)] pub fn new() -> Self { Config { + close_connection_on_bad_block: true, threads_num: 4, } } @@ -603,7 +609,12 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { let unknown_blocks_hashes: Vec<_> = { let chain = self.chain.read(); blocks_hashes.into_iter() - .filter(|h| chain.block_state(h) == BlockState::Unknown) + .filter(|h| { + // if we haven't closed connection after receiving dead-end block + // => also process dead-end blocks + let block_state = chain.block_state(h); + block_state == BlockState::Unknown || (block_state == BlockState::DeadEnd && !self.config.close_connection_on_bad_block) + }) .filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h)) .collect() }; @@ -1018,8 +1029,10 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor // close connection with this peer if let Some(peer_index) = self.verifying_blocks_by_peer.get(hash) { - warn!(target: "sync", "Closing connection with peer#{} as it has provided us with wrong block {:?}", peer_index, hash.to_reversed_str()); - self.executor.lock().execute(Task::Close(*peer_index)); + warn!(target: "sync", "Peer#{} has provided us with wrong block {:?}", peer_index, hash.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(*peer_index)); + } } { @@ -1103,6 +1116,7 @@ impl SynchronizationClientCore where T: TaskExecutor { do_not_relay: HashSet::new(), block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT), sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT), + config: config, } )); @@ -1267,11 +1281,8 @@ impl SynchronizationClientCore where T: TaskExecutor { assert_eq!(hashes.len(), headers.len()); let mut chain = self.chain.write(); - match chain.intersect_with_blocks_headers(&hashes, &headers) { - HeadersIntersection::DeadEnd(dead_block_index) => { - warn!(target: "sync", "Closing connection with peer#{} as it has provided us with dead-end block {:?}", peer_index, hashes[dead_block_index].to_reversed_str()); - self.executor.lock().execute(Task::Close(peer_index)); - }, + let intersection_result = chain.intersect_with_blocks_headers(&hashes, &headers); + match intersection_result { HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => { warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index); }, @@ -1290,7 +1301,16 @@ impl SynchronizationClientCore where T: TaskExecutor { HeadersIntersection::InMemoryMainNewBlocks(new_block_index) | HeadersIntersection::InMemoryForkNewBlocks(new_block_index) | HeadersIntersection::DbForkNewBlocks(new_block_index) - | HeadersIntersection::NoKnownBlocks(new_block_index) => { + | HeadersIntersection::NoKnownBlocks(new_block_index) + | HeadersIntersection::DeadEnd(new_block_index) => { + if let HeadersIntersection::DeadEnd(dead_block_index) = intersection_result { + warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, hashes[dead_block_index].to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + return; + } + } + // check that we do not know all blocks in range [new_block_index..] // if we know some block => there has been verification error => all headers should be ignored // see when_previous_block_verification_failed_fork_is_not_requested for details @@ -1299,10 +1319,14 @@ impl SynchronizationClientCore where T: TaskExecutor { match block_state { BlockState::Unknown => false, BlockState::DeadEnd => { - warn!(target: "sync", "Closing connection with peer#{} as it has provided us with blocks lead to dead-end block {:?}", peer_index, h.to_reversed_str()); - self.executor.lock().execute(Task::Close(peer_index)); - true - }, + warn!(target: "sync", "Peer#{} has provided us with blocks leading to dead-end block {:?}", peer_index, h.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + true + } else { + false + } + }, _ => true, } }) { @@ -1342,23 +1366,32 @@ impl SynchronizationClientCore where T: TaskExecutor { // prepare list of blocks to verify + make all required changes to the chain let mut result: Option> = None; let mut chain = self.chain.write(); - match chain.block_state(&block_hash) { - BlockState::DeadEnd => { - warn!(target: "sync", "Closing connection with peer#{} as it has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str()); - self.executor.lock().execute(Task::Close(peer_index)); - }, + let block_state = chain.block_state(&block_hash); + match block_state { BlockState::Verifying | BlockState::Stored => { // remember peer as useful self.peers.useful_peer(peer_index); }, - BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { - // check parent block state - match chain.block_state(&block.header().previous_header_hash) { - BlockState::DeadEnd => { - warn!(target: "sync", "Closing connection with peer#{} as it has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str()); + BlockState::Unknown | BlockState::Scheduled | BlockState::Requested | BlockState::DeadEnd => { + if block_state == BlockState::DeadEnd { + warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str()); + if self.config.close_connection_on_bad_block { self.executor.lock().execute(Task::Close(peer_index)); - }, - BlockState::Unknown => { + } + } + + // check parent block state + let parent_block_state = chain.block_state(&block.header().previous_header_hash); + match parent_block_state { + BlockState::Unknown | BlockState::DeadEnd => { + if parent_block_state == BlockState::DeadEnd { + warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + return result; + } + } + if self.state.is_synchronizing() { // when synchronizing, we tend to receive all blocks in-order trace!(