From 124170d2558e6185fc1f7d7f2a0f39172620cfd9 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 21 Nov 2016 14:10:04 +0300 Subject: [PATCH 1/2] get rid of verification waiter --- sync/src/local_node.rs | 32 ++++----- sync/src/synchronization_client.rs | 71 +++++++------------- sync/src/synchronization_server.rs | 102 +++++++++++++++++++++-------- 3 files changed, 108 insertions(+), 97 deletions(-) diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index f856489e..deaaf0fe 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -98,20 +98,21 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); - self.server.serve_getdata(peer_index, message); + self.server.serve_getdata(peer_index, message).map(|t| self.server.add_task(peer_index, t)); } pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index); - self.server.serve_getblocks(peer_index, message); + self.server.serve_getblocks(peer_index, message).map(|t| self.server.add_task(peer_index, t)); } pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index); // do not serve getheaders requests until we are synchronized - if self.client.lock().state().is_synchronizing() { + let mut client = self.client.lock(); + if client.state().is_synchronizing() { self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id)); return; } @@ -119,23 +120,12 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon // simulating bitcoind for passing tests: if we are in nearly-saturated state // and peer, which has just provided a new blocks to us, is asking for headers // => do not serve getheaders until we have fully process his blocks + wait until headers are served before returning - let need_wait = { - let (need_wait, waiter) = { self.client.lock().get_peers_nearly_blocks_waiter(peer_index) }; - if let Some(waiter) = waiter { - waiter.wait(); - } - need_wait - }; - - // if we do not need synchronized responses => inform p2p that we have processed this request - let id = if !need_wait { - self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id)); - None - } else { - Some(id) - }; - - self.server.serve_getheaders(peer_index, message, id); + self.server.serve_getheaders(peer_index, message, Some(id)) + .map(|task| { + let weak_server = Arc::downgrade(&self.server); + let task = task.future::(peer_index, weak_server); + client.after_peer_nearly_blocks_verified(peer_index, Box::new(task)); + }); } pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) { @@ -163,7 +153,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) { trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index); - self.server.serve_mempool(peer_index); + self.server.serve_mempool(peer_index).map(|t| self.server.add_task(peer_index, t)); } pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index cf5b53a6..71ae4440 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::cmp::{min, max}; use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::hash_map::Entry; -use parking_lot::{Mutex, Condvar}; +use parking_lot::Mutex; use futures::{BoxFuture, Future, finished}; use futures::stream::Stream; use tokio_core::reactor::{Handle, Interval}; @@ -187,7 +187,7 @@ pub trait Client : Send + 'static { fn on_peer_block(&mut self, peer_index: usize, block: Block); fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction); fn on_peer_disconnected(&mut self, peer_index: usize); - fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>); + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); } /// Synchronization client trait @@ -201,19 +201,11 @@ pub trait ClientCore : VerificationSink { fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option>; fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option>; fn on_peer_disconnected(&mut self, peer_index: usize); - fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>); + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>); fn try_switch_to_saturated_state(&mut self) -> bool; } -/// Synchronization peer blocks waiter -#[derive(Default)] -pub struct PeersBlocksWaiter { - /// Awake mutex - peer_blocks_lock: Mutex, - /// Awake event - peer_blocks_done: Condvar, -} /// Synchronization client configuration options. pub struct Config { @@ -249,8 +241,8 @@ pub struct SynchronizationClientCore { orphaned_transactions_pool: OrphanTransactionsPool, /// Verifying blocks by peer verifying_blocks_by_peer: HashMap, - /// Verifying blocks waiters - verifying_blocks_waiters: HashMap, Option>)>, + /// Verifying blocks futures + verifying_blocks_futures: HashMap, Vec>)>, } impl Config { @@ -342,8 +334,8 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().on_peer_disconnected(peer_index); } - fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>) { - self.core.lock().get_peers_nearly_blocks_waiter(peer_index) + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) { + self.core.lock().after_peer_nearly_blocks_verified(peer_index, future); } } @@ -511,23 +503,21 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } } - /// Get waiter for verifying blocks - fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>) { + /// Execute after last block from this peer in NearlySaturated state is verified. + /// If there are no verifying blocks from this peer or we are not in the NearlySaturated state => execute immediately. + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) { // if we are currently synchronizing => no need to wait if self.state.is_synchronizing() { - return (false, None); + future.wait().expect("no-error future"); + return; } // we have to wait until all previous peer requests are server - match self.verifying_blocks_waiters.entry(peer_index) { + match self.verifying_blocks_futures.entry(peer_index) { Entry::Occupied(mut entry) => { - if entry.get().1.is_none() { - entry.get_mut().1 = Some(Arc::new(PeersBlocksWaiter::default())); - } - // also wait until all blocks, supplied by this peer are verified - (true, entry.get().1.clone()) + entry.get_mut().1.push(future); }, - _ => (true, None), + _ => future.wait().expect("no-error future"), } } @@ -721,7 +711,7 @@ impl SynchronizationClientCore where T: TaskExecutor { orphaned_blocks_pool: OrphanBlocksPool::new(), orphaned_transactions_pool: OrphanTransactionsPool::new(), verifying_blocks_by_peer: HashMap::new(), - verifying_blocks_waiters: HashMap::new(), + verifying_blocks_futures: HashMap::new(), } )); @@ -887,14 +877,14 @@ impl SynchronizationClientCore where T: TaskExecutor { chain.verify_blocks(blocks_headers_to_verify); // remember that we are verifying block from this peer self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index); - match self.verifying_blocks_waiters.entry(peer_index) { + match self.verifying_blocks_futures.entry(peer_index) { Entry::Occupied(mut entry) => { entry.get_mut().0.insert(block_hash.clone()); }, Entry::Vacant(entry) => { let mut block_hashes = HashSet::new(); block_hashes.insert(block_hash.clone()); - entry.insert((block_hashes, None)); + entry.insert((block_hashes, Vec::new())); } } result = Some(blocks_to_verify); @@ -1012,15 +1002,15 @@ impl SynchronizationClientCore where T: TaskExecutor { if let Entry::Occupied(block_entry) = self.verifying_blocks_by_peer.entry(hash.clone()) { let peer_index = *block_entry.get(); // find a # of blocks, which this thread has supplied - if let Entry::Occupied(mut entry) = self.verifying_blocks_waiters.entry(peer_index) { + if let Entry::Occupied(mut entry) = self.verifying_blocks_futures.entry(peer_index) { let is_last_block = { - let &mut (ref mut waiting, ref waiter) = entry.get_mut(); + let &mut (ref mut waiting, ref mut futures) = entry.get_mut(); waiting.remove(hash); // if this is the last block => awake waiting threads let is_last_block = waiting.is_empty(); if is_last_block { - if let Some(ref waiter) = *waiter { - waiter.awake(); + for future in futures.drain(..) { + future.wait().expect("no-error future"); } } is_last_block @@ -1056,23 +1046,6 @@ impl SynchronizationClientCore where T: TaskExecutor { } } -impl PeersBlocksWaiter { - pub fn wait(&self) { - let mut locker = self.peer_blocks_lock.lock(); - if *locker { - return; - } - - self.peer_blocks_done.wait(&mut locker); - } - - pub fn awake(&self) { - let mut locker = self.peer_blocks_lock.lock(); - *locker = true; - self.peer_blocks_done.notify_all(); - } -} - #[cfg(test)] pub mod tests { use std::sync::Arc; diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index b935a595..8dfb93d5 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -1,8 +1,9 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::collections::{VecDeque, HashMap}; use std::collections::hash_map::Entry; +use futures::{Future, Poll, Async}; use parking_lot::{Mutex, Condvar}; use message::common::{InventoryVector, InventoryType}; use db; @@ -13,11 +14,12 @@ use synchronization_executor::{Task, TaskExecutor}; use message::types; /// Synchronization requests server trait -pub trait Server : Send + 'static { - fn serve_getdata(&self, peer_index: usize, message: types::GetData); - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks); - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option); - fn serve_mempool(&self, peer_index: usize); +pub trait Server : Send + Sync + 'static { + fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option; + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option; + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option; + fn serve_mempool(&self, peer_index: usize) -> Option; + fn add_task(&self, peer_index: usize, task: IndexedServerTask); } /// Synchronization requests server @@ -28,6 +30,7 @@ pub struct SynchronizationServer { worker_thread: Option>, } +/// Server tasks queue struct ServerQueue { is_stopping: AtomicBool, queue_ready: Arc, @@ -85,6 +88,37 @@ impl IndexedServerTask { fn ignore(id: u32) -> Self { IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id)) } + + pub fn future(self, peer_index: usize, server: Weak) -> IndexedServerTaskFuture { + IndexedServerTaskFuture::::new(server, peer_index, self) + } +} + +/// Future server task execution +pub struct IndexedServerTaskFuture { + server: Weak, + peer_index: usize, + task: Option, +} + +impl IndexedServerTaskFuture where T: Server { + pub fn new(server: Weak, peer_index: usize, task: IndexedServerTask) -> Self { + IndexedServerTaskFuture { + server: server, + peer_index: peer_index, + task: Some(task), + } + } +} + +impl Future for IndexedServerTaskFuture where T: Server { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + self.task.take().map(|t| self.server.upgrade().map(|s| s.add_task(self.peer_index, t))); + Ok(Async::Ready(())) + } } #[derive(Debug, PartialEq)] @@ -335,39 +369,46 @@ impl Drop for SynchronizationServer { } impl Server for SynchronizationServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) { + fn serve_getdata(&self, _peer_index: usize, message: types::GetData) -> Option { let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None); - self.queue.lock().add_task(peer_index, task); + Some(task) } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option { if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) { trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None); - self.queue.lock().add_task(peer_index, task); + Some(task) } else { trace!(target: "sync", "No common blocks with peer#{}", peer_index); + None } } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option { if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id)); let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index); - self.queue.lock().add_task(peer_index, task); + Some(task) } else { trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); if let Some(id) = id { - self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id)); + Some(IndexedServerTask::ignore(id)) + } else { + None } } } - fn serve_mempool(&self, peer_index: usize) { + fn serve_mempool(&self, _peer_index: usize) -> Option { let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None); + Some(task) + } + + fn add_task(&self, peer_index: usize, task: IndexedServerTask) { self.queue.lock().add_task(peer_index, task); } } @@ -461,7 +502,7 @@ pub mod tests { use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; use synchronization_chain::Chain; - use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex}; + use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex, IndexedServerTask}; pub struct DummyServer { tasks: Mutex>, @@ -480,26 +521,33 @@ pub mod tests { } impl Server for DummyServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) { + fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option { self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory))); + None } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option { self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); + None } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option) -> Option { self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); + None } - fn serve_mempool(&self, peer_index: usize) { + fn serve_mempool(&self, peer_index: usize) -> Option { self.tasks.lock().push((peer_index, ServerTask::ServeMempool)); + None + } + + fn add_task(&self, _peer_index: usize, _task: IndexedServerTask) { } } @@ -522,7 +570,7 @@ pub mod tests { ]; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }); + }).map(|t| server.add_task(0, t)); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]); @@ -540,7 +588,7 @@ pub mod tests { ]; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }); + }).map(|t| server.add_task(0, t)); // => respond with block let tasks = DummyTaskExecutor::wait_tasks(executor); assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]); @@ -555,7 +603,7 @@ pub mod tests { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }); + }).map(|t| server.add_task(0, t)); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![]); @@ -570,7 +618,7 @@ pub mod tests { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }); + }).map(|t| server.add_task(0, t)); // => responds with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, @@ -590,7 +638,7 @@ pub mod tests { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }, Some(dummy_id)); + }, Some(dummy_id)).map(|t| server.add_task(0, t)); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); @@ -606,7 +654,7 @@ pub mod tests { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }, Some(dummy_id)); + }, Some(dummy_id)).map(|t| server.add_task(0, t)); // => responds with headers let headers = vec![ test_data::block_h1().block_header, @@ -619,7 +667,7 @@ pub mod tests { fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() { let (_, executor, server) = create_synchronization_server(); // when asking for memory pool transactions ids - server.serve_mempool(0); + server.serve_mempool(0).map(|t| server.add_task(0, t)); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![]); @@ -633,7 +681,7 @@ pub mod tests { let transaction_hash = transaction.hash(); chain.write().insert_verified_transaction(transaction); // when asking for memory pool transactions ids - server.serve_mempool(0); + server.serve_mempool(0).map(|t| server.add_task(0, t)); // => respond with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, From 21cee15d8dea58941b7b6c7622575a4d84e58b14 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 21 Nov 2016 15:33:13 +0300 Subject: [PATCH 2/2] use futures::lazy instead of manual IndexedServerTaskFuture --- sync/src/synchronization_server.rs | 36 +++++------------------------- 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 8dfb93d5..6ac83264 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::collections::{VecDeque, HashMap}; use std::collections::hash_map::Entry; -use futures::{Future, Poll, Async}; +use futures::{Future, BoxFuture, lazy, finished}; use parking_lot::{Mutex, Condvar}; use message::common::{InventoryVector, InventoryType}; use db; @@ -89,35 +89,11 @@ impl IndexedServerTask { IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id)) } - pub fn future(self, peer_index: usize, server: Weak) -> IndexedServerTaskFuture { - IndexedServerTaskFuture::::new(server, peer_index, self) - } -} - -/// Future server task execution -pub struct IndexedServerTaskFuture { - server: Weak, - peer_index: usize, - task: Option, -} - -impl IndexedServerTaskFuture where T: Server { - pub fn new(server: Weak, peer_index: usize, task: IndexedServerTask) -> Self { - IndexedServerTaskFuture { - server: server, - peer_index: peer_index, - task: Some(task), - } - } -} - -impl Future for IndexedServerTaskFuture where T: Server { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - self.task.take().map(|t| self.server.upgrade().map(|s| s.add_task(self.peer_index, t))); - Ok(Async::Ready(())) + pub fn future(self, peer_index: usize, server: Weak) -> BoxFuture<(), ()> { + lazy(move || { + server.upgrade().map(|s| s.add_task(peer_index, self)); + finished::<(), ()>(()) + }).boxed() } }