config flag for passing regtests
This commit is contained in:
parent
2b1fecb286
commit
c9605c05bc
|
@ -74,10 +74,17 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
|
||||||
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
|
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
|
||||||
use synchronization_verifier::AsyncVerifier;
|
use synchronization_verifier::AsyncVerifier;
|
||||||
|
|
||||||
|
let sync_client_config = SynchronizationConfig {
|
||||||
|
// during regtests, peer is providing us with bad blocks => we shouldn't close connection because of this
|
||||||
|
close_connection_on_bad_block: network != Magic::Regtest,
|
||||||
|
// TODO: remove me
|
||||||
|
threads_num: 4,
|
||||||
|
};
|
||||||
|
|
||||||
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
||||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||||
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
||||||
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network);
|
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), network);
|
||||||
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
|
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
|
||||||
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
||||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
||||||
|
|
|
@ -242,6 +242,8 @@ pub trait ClientCore : VerificationSink {
|
||||||
/// Synchronization client configuration options.
|
/// Synchronization client configuration options.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
/// If true, connection to peer who has provided us with bad block is closed
|
||||||
|
pub close_connection_on_bad_block: bool,
|
||||||
/// Number of threads to allocate in synchronization CpuPool.
|
/// Number of threads to allocate in synchronization CpuPool.
|
||||||
pub threads_num: usize,
|
pub threads_num: usize,
|
||||||
}
|
}
|
||||||
|
@ -310,6 +312,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
|
||||||
block_speed_meter: AverageSpeedMeter,
|
block_speed_meter: AverageSpeedMeter,
|
||||||
/// Block synchronization speed meter
|
/// Block synchronization speed meter
|
||||||
sync_speed_meter: AverageSpeedMeter,
|
sync_speed_meter: AverageSpeedMeter,
|
||||||
|
/// Configuration
|
||||||
|
config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block headers provider from `headers` message
|
/// Block headers provider from `headers` message
|
||||||
|
@ -337,8 +341,10 @@ struct AverageSpeedMeter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
#[cfg(test)]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Config {
|
Config {
|
||||||
|
close_connection_on_bad_block: true,
|
||||||
threads_num: 4,
|
threads_num: 4,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -603,7 +609,12 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
let unknown_blocks_hashes: Vec<_> = {
|
let unknown_blocks_hashes: Vec<_> = {
|
||||||
let chain = self.chain.read();
|
let chain = self.chain.read();
|
||||||
blocks_hashes.into_iter()
|
blocks_hashes.into_iter()
|
||||||
.filter(|h| chain.block_state(h) == BlockState::Unknown)
|
.filter(|h| {
|
||||||
|
// if we haven't closed connection after receiving dead-end block
|
||||||
|
// => also process dead-end blocks
|
||||||
|
let block_state = chain.block_state(h);
|
||||||
|
block_state == BlockState::Unknown || (block_state == BlockState::DeadEnd && !self.config.close_connection_on_bad_block)
|
||||||
|
})
|
||||||
.filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h))
|
.filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h))
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
@ -1018,8 +1029,10 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
||||||
|
|
||||||
// close connection with this peer
|
// close connection with this peer
|
||||||
if let Some(peer_index) = self.verifying_blocks_by_peer.get(hash) {
|
if let Some(peer_index) = self.verifying_blocks_by_peer.get(hash) {
|
||||||
warn!(target: "sync", "Closing connection with peer#{} as it has provided us with wrong block {:?}", peer_index, hash.to_reversed_str());
|
warn!(target: "sync", "Peer#{} has provided us with wrong block {:?}", peer_index, hash.to_reversed_str());
|
||||||
self.executor.lock().execute(Task::Close(*peer_index));
|
if self.config.close_connection_on_bad_block {
|
||||||
|
self.executor.lock().execute(Task::Close(*peer_index));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -1103,6 +1116,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
do_not_relay: HashSet::new(),
|
do_not_relay: HashSet::new(),
|
||||||
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
|
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
|
||||||
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
|
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
|
||||||
|
config: config,
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
|
|
||||||
|
@ -1267,11 +1281,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
assert_eq!(hashes.len(), headers.len());
|
assert_eq!(hashes.len(), headers.len());
|
||||||
|
|
||||||
let mut chain = self.chain.write();
|
let mut chain = self.chain.write();
|
||||||
match chain.intersect_with_blocks_headers(&hashes, &headers) {
|
let intersection_result = chain.intersect_with_blocks_headers(&hashes, &headers);
|
||||||
HeadersIntersection::DeadEnd(dead_block_index) => {
|
match intersection_result {
|
||||||
warn!(target: "sync", "Closing connection with peer#{} as it has provided us with dead-end block {:?}", peer_index, hashes[dead_block_index].to_reversed_str());
|
|
||||||
self.executor.lock().execute(Task::Close(peer_index));
|
|
||||||
},
|
|
||||||
HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => {
|
HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => {
|
||||||
warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index);
|
warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index);
|
||||||
},
|
},
|
||||||
|
@ -1290,7 +1301,16 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
HeadersIntersection::InMemoryMainNewBlocks(new_block_index)
|
HeadersIntersection::InMemoryMainNewBlocks(new_block_index)
|
||||||
| HeadersIntersection::InMemoryForkNewBlocks(new_block_index)
|
| HeadersIntersection::InMemoryForkNewBlocks(new_block_index)
|
||||||
| HeadersIntersection::DbForkNewBlocks(new_block_index)
|
| HeadersIntersection::DbForkNewBlocks(new_block_index)
|
||||||
| HeadersIntersection::NoKnownBlocks(new_block_index) => {
|
| HeadersIntersection::NoKnownBlocks(new_block_index)
|
||||||
|
| HeadersIntersection::DeadEnd(new_block_index) => {
|
||||||
|
if let HeadersIntersection::DeadEnd(dead_block_index) = intersection_result {
|
||||||
|
warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, hashes[dead_block_index].to_reversed_str());
|
||||||
|
if self.config.close_connection_on_bad_block {
|
||||||
|
self.executor.lock().execute(Task::Close(peer_index));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check that we do not know all blocks in range [new_block_index..]
|
// check that we do not know all blocks in range [new_block_index..]
|
||||||
// if we know some block => there has been verification error => all headers should be ignored
|
// if we know some block => there has been verification error => all headers should be ignored
|
||||||
// see when_previous_block_verification_failed_fork_is_not_requested for details
|
// see when_previous_block_verification_failed_fork_is_not_requested for details
|
||||||
|
@ -1299,10 +1319,14 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
match block_state {
|
match block_state {
|
||||||
BlockState::Unknown => false,
|
BlockState::Unknown => false,
|
||||||
BlockState::DeadEnd => {
|
BlockState::DeadEnd => {
|
||||||
warn!(target: "sync", "Closing connection with peer#{} as it has provided us with blocks lead to dead-end block {:?}", peer_index, h.to_reversed_str());
|
warn!(target: "sync", "Peer#{} has provided us with blocks leading to dead-end block {:?}", peer_index, h.to_reversed_str());
|
||||||
self.executor.lock().execute(Task::Close(peer_index));
|
if self.config.close_connection_on_bad_block {
|
||||||
true
|
self.executor.lock().execute(Task::Close(peer_index));
|
||||||
},
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
_ => true,
|
_ => true,
|
||||||
}
|
}
|
||||||
}) {
|
}) {
|
||||||
|
@ -1342,23 +1366,32 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
// prepare list of blocks to verify + make all required changes to the chain
|
// prepare list of blocks to verify + make all required changes to the chain
|
||||||
let mut result: Option<VecDeque<(H256, IndexedBlock)>> = None;
|
let mut result: Option<VecDeque<(H256, IndexedBlock)>> = None;
|
||||||
let mut chain = self.chain.write();
|
let mut chain = self.chain.write();
|
||||||
match chain.block_state(&block_hash) {
|
let block_state = chain.block_state(&block_hash);
|
||||||
BlockState::DeadEnd => {
|
match block_state {
|
||||||
warn!(target: "sync", "Closing connection with peer#{} as it has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str());
|
|
||||||
self.executor.lock().execute(Task::Close(peer_index));
|
|
||||||
},
|
|
||||||
BlockState::Verifying | BlockState::Stored => {
|
BlockState::Verifying | BlockState::Stored => {
|
||||||
// remember peer as useful
|
// remember peer as useful
|
||||||
self.peers.useful_peer(peer_index);
|
self.peers.useful_peer(peer_index);
|
||||||
},
|
},
|
||||||
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
|
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested | BlockState::DeadEnd => {
|
||||||
// check parent block state
|
if block_state == BlockState::DeadEnd {
|
||||||
match chain.block_state(&block.header().previous_header_hash) {
|
warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str());
|
||||||
BlockState::DeadEnd => {
|
if self.config.close_connection_on_bad_block {
|
||||||
warn!(target: "sync", "Closing connection with peer#{} as it has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str());
|
|
||||||
self.executor.lock().execute(Task::Close(peer_index));
|
self.executor.lock().execute(Task::Close(peer_index));
|
||||||
},
|
}
|
||||||
BlockState::Unknown => {
|
}
|
||||||
|
|
||||||
|
// check parent block state
|
||||||
|
let parent_block_state = chain.block_state(&block.header().previous_header_hash);
|
||||||
|
match parent_block_state {
|
||||||
|
BlockState::Unknown | BlockState::DeadEnd => {
|
||||||
|
if parent_block_state == BlockState::DeadEnd {
|
||||||
|
warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str());
|
||||||
|
if self.config.close_connection_on_bad_block {
|
||||||
|
self.executor.lock().execute(Task::Close(peer_index));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if self.state.is_synchronizing() {
|
if self.state.is_synchronizing() {
|
||||||
// when synchronizing, we tend to receive all blocks in-order
|
// when synchronizing, we tend to receive all blocks in-order
|
||||||
trace!(
|
trace!(
|
||||||
|
|
Loading…
Reference in New Issue