serve getheaders && getdata completely in server thread

This commit is contained in:
Svyatoslav Nikolsky 2016-11-29 14:52:28 +03:00
parent 270a04c887
commit 2526bcefdb
2 changed files with 58 additions and 81 deletions

View File

@ -67,8 +67,6 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
trace!(target: "sync", "Starting new sync session with peer#{}", peer_index);
// request inventory from peer
// TODO: bitcoind doesn't respond to the `getheaders` request while it is synchronizing
// but it answers to the `inventory` request
self.executor.lock().execute(SynchronizationTask::RequestBlocksHeaders(peer_index));
}

View File

@ -9,7 +9,7 @@ use message::common::{InventoryVector, InventoryType};
use db;
use chain::{BlockHeader, Transaction};
use primitives::hash::H256;
use synchronization_chain::{ChainRef, TransactionState};
use synchronization_chain::{Chain, ChainRef, TransactionState};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_client::FilteredInventory;
use message::types;
@ -26,7 +26,6 @@ pub trait Server : Send + Sync + 'static {
/// Synchronization requests server
pub struct SynchronizationServer {
chain: ChainRef,
queue_ready: Arc<Condvar>,
queue: Arc<Mutex<ServerQueue>>,
worker_thread: Option<thread::JoinHandle<()>>,
@ -87,10 +86,6 @@ impl IndexedServerTask {
}
impl IndexedServerTask {
fn ignore(id: u32) -> Self {
IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id))
}
pub fn future<T: Server>(self, peer_index: usize, server: Weak<T>) -> BoxFuture<(), ()> {
lazy(move || {
server.upgrade().map(|s| s.add_task(peer_index, self));
@ -102,15 +97,14 @@ impl IndexedServerTask {
#[derive(Debug, PartialEq)]
pub enum ServerTask {
ServeGetData(FilteredInventory),
ServeGetBlocks(db::BestBlock, H256),
ServeGetHeaders(db::BestBlock, H256),
ServeGetBlocks(Vec<H256>, H256),
ServeGetHeaders(Vec<H256>, H256),
ServeGetBlockTxn(H256, Vec<usize>),
ServeMempool,
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
ReturnMerkleBlock(types::MerkleBlock),
ReturnTransaction(Transaction),
Ignore,
}
impl SynchronizationServer {
@ -118,7 +112,6 @@ impl SynchronizationServer {
let queue_ready = Arc::new(Condvar::new());
let queue = Arc::new(Mutex::new(ServerQueue::new(queue_ready.clone())));
let mut server = SynchronizationServer {
chain: chain.clone(),
queue_ready: queue_ready.clone(),
queue: queue.clone(),
worker_thread: None,
@ -129,14 +122,14 @@ impl SynchronizationServer {
server
}
fn locate_known_block_hash(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
block_locator_hashes.into_iter()
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&self.chain, &hash))
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash))
.nth(0)
}
fn locate_known_block_header(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
self.locate_known_block_hash(block_locator_hashes)
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes)
}
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
@ -217,31 +210,50 @@ impl SynchronizationServer {
queue.lock().task_processed(peer_index);
},
// `getblocks` => `inventory`
ServerTask::ServeGetBlocks(best_block, hash_stop) => {
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
assert_eq!(indexed_task.id, ServerTaskIndex::None);
ServerTask::ServeGetBlocks(block_locator_hashes, hash_stop) => {
assert_eq!(indexed_task.id, ServerTaskIndex::None);
let chain = chain.read();
if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
}
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getheaders` => `headers`
ServerTask::ServeGetHeaders(best_block, hash_stop) => {
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000);
if !blocks_headers.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
} else if let Some(response_id) = indexed_task.id.raw() {
executor.lock().execute(Task::Ignore(peer_index, response_id));
ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => {
let chain = chain.read();
if let Some(best_common_block) = SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000);
if !blocks_headers.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
} else if let Some(response_id) = indexed_task.id.raw() {
executor.lock().execute(Task::Ignore(peer_index, response_id));
}
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
if let Some(response_id) = indexed_task.id.raw() {
executor.lock().execute(Task::Ignore(peer_index, response_id));
}
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
@ -326,18 +338,11 @@ impl SynchronizationServer {
ServerTask::ReturnTransaction(transaction) => {
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
}
// ignore
ServerTask::Ignore => {
let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task");
executor.lock().execute(Task::Ignore(peer_index, response_id));
queue.lock().task_processed(peer_index);
},
}
}
}
fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
let chain = chain.read();
fn blocks_hashes_after(chain: &Chain, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
// check that chain has not reorganized since task was queued
if chain.block_hash(best_block.number).map(|h| h != best_block.hash).unwrap_or(true) {
return Vec::new();
@ -354,8 +359,7 @@ impl SynchronizationServer {
.collect()
}
fn blocks_headers_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<BlockHeader> {
let chain = chain.read();
fn blocks_headers_after(chain: &Chain, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<BlockHeader> {
// check that chain has not reorganized since task was queued
if chain.block_hash(best_block.number).map(|h| h != best_block.hash).unwrap_or(true) {
return Vec::new();
@ -373,8 +377,7 @@ impl SynchronizationServer {
}
fn locate_best_known_block_hash(chain: &ChainRef, hash: &H256) -> Option<db::BestBlock> {
let chain = chain.read();
fn locate_best_known_block_hash(chain: &Chain, hash: &H256) -> Option<db::BestBlock> {
match chain.block_number(hash) {
Some(number) => Some(db::BestBlock {
number: number,
@ -420,33 +423,15 @@ impl Server for SynchronizationServer {
Some(task)
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None);
Some(task)
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
None
}
fn serve_getblocks(&self, _peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(message.block_locator_hashes, message.hash_stop), ServerTaskIndex::None);
Some(task)
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask> {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final);
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index);
Some(task)
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
if let Some(id) = id {
Some(IndexedServerTask::ignore(id))
} else {
None
}
}
fn serve_getheaders(&self, _peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask> {
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final);
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(message.block_locator_hashes, message.hash_stop), server_task_index);
Some(task)
}
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
@ -581,18 +566,12 @@ pub mod tests {
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(message.block_locator_hashes, message.hash_stop)));
None
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option<u32>) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(message.block_locator_hashes, message.hash_stop)));
None
}