Merge pull request #156 from ethcore/do_not_block_p2p_thread

Get rid of verification waiter
This commit is contained in:
Nikolay Volf 2016-11-21 15:41:10 +03:00 committed by GitHub
commit ef5079125f
3 changed files with 84 additions and 97 deletions

View File

@ -98,20 +98,21 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message);
self.server.serve_getdata(peer_index, message).map(|t| self.server.add_task(peer_index, t));
}
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
self.server.serve_getblocks(peer_index, message);
self.server.serve_getblocks(peer_index, message).map(|t| self.server.add_task(peer_index, t));
}
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
// do not serve getheaders requests until we are synchronized
if self.client.lock().state().is_synchronizing() {
let mut client = self.client.lock();
if client.state().is_synchronizing() {
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
return;
}
@ -119,23 +120,12 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// simulating bitcoind for passing tests: if we are in nearly-saturated state
// and peer, which has just provided a new blocks to us, is asking for headers
// => do not serve getheaders until we have fully process his blocks + wait until headers are served before returning
let need_wait = {
let (need_wait, waiter) = { self.client.lock().get_peers_nearly_blocks_waiter(peer_index) };
if let Some(waiter) = waiter {
waiter.wait();
}
need_wait
};
// if we do not need synchronized responses => inform p2p that we have processed this request
let id = if !need_wait {
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
None
} else {
Some(id)
};
self.server.serve_getheaders(peer_index, message, id);
self.server.serve_getheaders(peer_index, message, Some(id))
.map(|task| {
let weak_server = Arc::downgrade(&self.server);
let task = task.future::<U>(peer_index, weak_server);
client.after_peer_nearly_blocks_verified(peer_index, Box::new(task));
});
}
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
@ -163,7 +153,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
self.server.serve_mempool(peer_index);
self.server.serve_mempool(peer_index).map(|t| self.server.add_task(peer_index, t));
}
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use std::cmp::{min, max};
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::hash_map::Entry;
use parking_lot::{Mutex, Condvar};
use parking_lot::Mutex;
use futures::{BoxFuture, Future, finished};
use futures::stream::Stream;
use tokio_core::reactor::{Handle, Interval};
@ -187,7 +187,7 @@ pub trait Client : Send + 'static {
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
}
/// Synchronization client trait
@ -201,19 +201,11 @@ pub trait ClientCore : VerificationSink {
fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option<VecDeque<(H256, Block)>>;
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>>;
fn on_peer_disconnected(&mut self, peer_index: usize);
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>);
fn try_switch_to_saturated_state(&mut self) -> bool;
}
/// Synchronization peer blocks waiter
#[derive(Default)]
pub struct PeersBlocksWaiter {
/// Awake mutex
peer_blocks_lock: Mutex<bool>,
/// Awake event
peer_blocks_done: Condvar,
}
/// Synchronization client configuration options.
pub struct Config {
@ -249,8 +241,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
orphaned_transactions_pool: OrphanTransactionsPool,
/// Verifying blocks by peer
verifying_blocks_by_peer: HashMap<H256, usize>,
/// Verifying blocks waiters
verifying_blocks_waiters: HashMap<usize, (HashSet<H256>, Option<Arc<PeersBlocksWaiter>>)>,
/// Verifying blocks futures
verifying_blocks_futures: HashMap<usize, (HashSet<H256>, Vec<BoxFuture<(), ()>>)>,
}
impl Config {
@ -342,8 +334,8 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().on_peer_disconnected(peer_index);
}
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>) {
self.core.lock().get_peers_nearly_blocks_waiter(peer_index)
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) {
self.core.lock().after_peer_nearly_blocks_verified(peer_index, future);
}
}
@ -511,23 +503,21 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Get waiter for verifying blocks
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>) {
/// Execute after last block from this peer in NearlySaturated state is verified.
/// If there are no verifying blocks from this peer or we are not in the NearlySaturated state => execute immediately.
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) {
// if we are currently synchronizing => no need to wait
if self.state.is_synchronizing() {
return (false, None);
future.wait().expect("no-error future");
return;
}
// we have to wait until all previous peer requests are server
match self.verifying_blocks_waiters.entry(peer_index) {
match self.verifying_blocks_futures.entry(peer_index) {
Entry::Occupied(mut entry) => {
if entry.get().1.is_none() {
entry.get_mut().1 = Some(Arc::new(PeersBlocksWaiter::default()));
}
// also wait until all blocks, supplied by this peer are verified
(true, entry.get().1.clone())
entry.get_mut().1.push(future);
},
_ => (true, None),
_ => future.wait().expect("no-error future"),
}
}
@ -721,7 +711,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
orphaned_blocks_pool: OrphanBlocksPool::new(),
orphaned_transactions_pool: OrphanTransactionsPool::new(),
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_waiters: HashMap::new(),
verifying_blocks_futures: HashMap::new(),
}
));
@ -887,14 +877,14 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
chain.verify_blocks(blocks_headers_to_verify);
// remember that we are verifying block from this peer
self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index);
match self.verifying_blocks_waiters.entry(peer_index) {
match self.verifying_blocks_futures.entry(peer_index) {
Entry::Occupied(mut entry) => {
entry.get_mut().0.insert(block_hash.clone());
},
Entry::Vacant(entry) => {
let mut block_hashes = HashSet::new();
block_hashes.insert(block_hash.clone());
entry.insert((block_hashes, None));
entry.insert((block_hashes, Vec::new()));
}
}
result = Some(blocks_to_verify);
@ -1012,15 +1002,15 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
if let Entry::Occupied(block_entry) = self.verifying_blocks_by_peer.entry(hash.clone()) {
let peer_index = *block_entry.get();
// find a # of blocks, which this thread has supplied
if let Entry::Occupied(mut entry) = self.verifying_blocks_waiters.entry(peer_index) {
if let Entry::Occupied(mut entry) = self.verifying_blocks_futures.entry(peer_index) {
let is_last_block = {
let &mut (ref mut waiting, ref waiter) = entry.get_mut();
let &mut (ref mut waiting, ref mut futures) = entry.get_mut();
waiting.remove(hash);
// if this is the last block => awake waiting threads
let is_last_block = waiting.is_empty();
if is_last_block {
if let Some(ref waiter) = *waiter {
waiter.awake();
for future in futures.drain(..) {
future.wait().expect("no-error future");
}
}
is_last_block
@ -1056,23 +1046,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
impl PeersBlocksWaiter {
pub fn wait(&self) {
let mut locker = self.peer_blocks_lock.lock();
if *locker {
return;
}
self.peer_blocks_done.wait(&mut locker);
}
pub fn awake(&self) {
let mut locker = self.peer_blocks_lock.lock();
*locker = true;
self.peer_blocks_done.notify_all();
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;

View File

@ -1,8 +1,9 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::collections::{VecDeque, HashMap};
use std::collections::hash_map::Entry;
use futures::{Future, BoxFuture, lazy, finished};
use parking_lot::{Mutex, Condvar};
use message::common::{InventoryVector, InventoryType};
use db;
@ -13,11 +14,12 @@ use synchronization_executor::{Task, TaskExecutor};
use message::types;
/// Synchronization requests server trait
pub trait Server : Send + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>);
fn serve_mempool(&self, peer_index: usize);
pub trait Server : Send + Sync + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option<IndexedServerTask>;
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask>;
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask>;
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask>;
fn add_task(&self, peer_index: usize, task: IndexedServerTask);
}
/// Synchronization requests server
@ -28,6 +30,7 @@ pub struct SynchronizationServer {
worker_thread: Option<thread::JoinHandle<()>>,
}
/// Server tasks queue
struct ServerQueue {
is_stopping: AtomicBool,
queue_ready: Arc<Condvar>,
@ -85,6 +88,13 @@ impl IndexedServerTask {
fn ignore(id: u32) -> Self {
IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id))
}
pub fn future<T: Server>(self, peer_index: usize, server: Weak<T>) -> BoxFuture<(), ()> {
lazy(move || {
server.upgrade().map(|s| s.add_task(peer_index, self));
finished::<(), ()>(())
}).boxed()
}
}
#[derive(Debug, PartialEq)]
@ -335,39 +345,46 @@ impl Drop for SynchronizationServer {
}
impl Server for SynchronizationServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
fn serve_getdata(&self, _peer_index: usize, message: types::GetData) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None);
self.queue.lock().add_task(peer_index, task);
Some(task)
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None);
self.queue.lock().add_task(peer_index, task);
Some(task)
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
None
}
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask> {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id));
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index);
self.queue.lock().add_task(peer_index, task);
Some(task)
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
if let Some(id) = id {
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
Some(IndexedServerTask::ignore(id))
} else {
None
}
}
}
fn serve_mempool(&self, peer_index: usize) {
fn serve_mempool(&self, _peer_index: usize) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None);
Some(task)
}
fn add_task(&self, peer_index: usize, task: IndexedServerTask) {
self.queue.lock().add_task(peer_index, task);
}
}
@ -461,7 +478,7 @@ pub mod tests {
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex};
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex, IndexedServerTask};
pub struct DummyServer {
tasks: Mutex<Vec<(usize, ServerTask)>>,
@ -480,26 +497,33 @@ pub mod tests {
}
impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
None
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
None
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option<u32>) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option<u32>) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
None
}
fn serve_mempool(&self, peer_index: usize) {
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
None
}
fn add_task(&self, _peer_index: usize, _task: IndexedServerTask) {
}
}
@ -522,7 +546,7 @@ pub mod tests {
];
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}).map(|t| server.add_task(0, t));
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
@ -540,7 +564,7 @@ pub mod tests {
];
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}).map(|t| server.add_task(0, t));
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);
@ -555,7 +579,7 @@ pub mod tests {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}).map(|t| server.add_task(0, t));
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
@ -570,7 +594,7 @@ pub mod tests {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}).map(|t| server.add_task(0, t));
// => responds with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
@ -590,7 +614,7 @@ pub mod tests {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
}, Some(dummy_id));
}, Some(dummy_id)).map(|t| server.add_task(0, t));
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
@ -606,7 +630,7 @@ pub mod tests {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
}, Some(dummy_id));
}, Some(dummy_id)).map(|t| server.add_task(0, t));
// => responds with headers
let headers = vec![
test_data::block_h1().block_header,
@ -619,7 +643,7 @@ pub mod tests {
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
let (_, executor, server) = create_synchronization_server();
// when asking for memory pool transactions ids
server.serve_mempool(0);
server.serve_mempool(0).map(|t| server.add_task(0, t));
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
@ -633,7 +657,7 @@ pub mod tests {
let transaction_hash = transaction.hash();
chain.write().insert_verified_transaction(transaction);
// when asking for memory pool transactions ids
server.serve_mempool(0);
server.serve_mempool(0).map(|t| server.add_task(0, t));
// => respond with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageTx,