getting rid of sync waits
This commit is contained in:
parent
47ee3c2a82
commit
964ac87135
|
@ -18,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync {
|
|||
fn start_sync_session(&self, version: u32);
|
||||
fn close_session(&self);
|
||||
fn on_inventory(&self, message: types::Inv);
|
||||
fn on_getdata(&self, message: types::GetData, id: u32);
|
||||
fn on_getblocks(&self, message: types::GetBlocks, id: u32);
|
||||
fn on_getdata(&self, message: types::GetData);
|
||||
fn on_getblocks(&self, message: types::GetBlocks);
|
||||
fn on_getheaders(&self, message: types::GetHeaders, id: u32);
|
||||
fn on_transaction(&self, message: types::Tx);
|
||||
fn on_block(&self, message: types::Block);
|
||||
fn on_headers(&self, message: types::Headers);
|
||||
fn on_mempool(&self, message: types::MemPool, id: u32);
|
||||
fn on_mempool(&self, message: types::MemPool);
|
||||
fn on_filterload(&self, message: types::FilterLoad);
|
||||
fn on_filteradd(&self, message: types::FilterAdd);
|
||||
fn on_filterclear(&self, message: types::FilterClear);
|
||||
|
@ -39,13 +39,14 @@ pub trait InboundSyncConnection : Send + Sync {
|
|||
}
|
||||
|
||||
pub trait OutboundSyncConnection : Send + Sync {
|
||||
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool);
|
||||
fn send_inventory(&self, message: &types::Inv);
|
||||
fn send_getdata(&self, message: &types::GetData);
|
||||
fn send_getblocks(&self, message: &types::GetBlocks);
|
||||
fn send_getheaders(&self, message: &types::GetHeaders);
|
||||
fn send_transaction(&self, message: &types::Tx);
|
||||
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
|
||||
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
|
||||
fn send_block(&self, message: &types::Block);
|
||||
fn send_headers(&self, message: &types::Headers);
|
||||
fn respond_headers(&self, message: &types::Headers, id: u32);
|
||||
fn send_mempool(&self, message: &types::MemPool);
|
||||
fn send_filterload(&self, message: &types::FilterLoad);
|
||||
fn send_filteradd(&self, message: &types::FilterAdd);
|
||||
|
@ -57,7 +58,7 @@ pub trait OutboundSyncConnection : Send + Sync {
|
|||
fn send_compact_block(&self, message: &types::CompactBlock);
|
||||
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
|
||||
fn send_block_txn(&self, message: &types::BlockTxn);
|
||||
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool);
|
||||
fn send_notfound(&self, message: &types::NotFound);
|
||||
fn ignored(&self, id: u32);
|
||||
}
|
||||
|
||||
|
@ -78,8 +79,8 @@ impl OutboundSync {
|
|||
}
|
||||
|
||||
impl OutboundSyncConnection for OutboundSync {
|
||||
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
fn send_inventory(&self, message: &types::Inv) {
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_getdata(&self, message: &types::GetData) {
|
||||
|
@ -98,12 +99,16 @@ impl OutboundSyncConnection for OutboundSync {
|
|||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_block(&self, message: &types::Block, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
fn send_block(&self, message: &types::Block) {
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
fn send_headers(&self, message: &types::Headers) {
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn respond_headers(&self, message: &types::Headers, id: u32) {
|
||||
self.context.send_response(message, id, true);
|
||||
}
|
||||
|
||||
fn send_mempool(&self, message: &types::MemPool) {
|
||||
|
@ -150,8 +155,8 @@ impl OutboundSyncConnection for OutboundSync {
|
|||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
fn send_notfound(&self, message: &types::NotFound) {
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn ignored(&self, id: u32) {
|
||||
|
@ -188,15 +193,11 @@ impl Protocol for SyncProtocol {
|
|||
}
|
||||
else if command == &types::GetData::command() {
|
||||
let message: types::GetData = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::GetData::command());
|
||||
self.inbound_connection.on_getdata(message, id);
|
||||
self.inbound_connection.on_getdata(message);
|
||||
}
|
||||
else if command == &types::GetBlocks::command() {
|
||||
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::GetBlocks::command());
|
||||
self.inbound_connection.on_getblocks(message, id);
|
||||
self.inbound_connection.on_getblocks(message);
|
||||
}
|
||||
else if command == &types::GetHeaders::command() {
|
||||
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
|
||||
|
@ -214,9 +215,7 @@ impl Protocol for SyncProtocol {
|
|||
}
|
||||
else if command == &types::MemPool::command() {
|
||||
let message: types::MemPool = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::MemPool::command());
|
||||
self.inbound_connection.on_mempool(message, id);
|
||||
self.inbound_connection.on_mempool(message);
|
||||
}
|
||||
else if command == &types::Headers::command() {
|
||||
let message: types::Headers = try!(deserialize_payload(payload, version));
|
||||
|
|
|
@ -29,12 +29,12 @@ impl InboundSyncConnection for InboundConnection {
|
|||
self.local_node.on_peer_inventory(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_getdata(&self, message: types::GetData, id: u32) {
|
||||
self.local_node.on_peer_getdata(self.peer_index, message, id);
|
||||
fn on_getdata(&self, message: types::GetData) {
|
||||
self.local_node.on_peer_getdata(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_getblocks(&self, message: types::GetBlocks, id: u32) {
|
||||
self.local_node.on_peer_getblocks(self.peer_index, message, id);
|
||||
fn on_getblocks(&self, message: types::GetBlocks) {
|
||||
self.local_node.on_peer_getblocks(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_getheaders(&self, message: types::GetHeaders, id: u32) {
|
||||
|
@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection {
|
|||
self.local_node.on_peer_headers(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_mempool(&self, message: types::MemPool, id: u32) {
|
||||
self.local_node.on_peer_mempool(self.peer_index, message, id);
|
||||
fn on_mempool(&self, message: types::MemPool) {
|
||||
self.local_node.on_peer_mempool(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_filterload(&self, message: types::FilterLoad) {
|
||||
|
|
|
@ -95,16 +95,16 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
// TODO: process other inventory types
|
||||
}
|
||||
|
||||
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
|
||||
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_getdata(peer_index, message, id);
|
||||
self.server.serve_getdata(peer_index, message);
|
||||
}
|
||||
|
||||
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
|
||||
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_getblocks(peer_index, message, id);
|
||||
self.server.serve_getblocks(peer_index, message);
|
||||
}
|
||||
|
||||
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
|
||||
|
@ -127,10 +127,15 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
need_wait
|
||||
};
|
||||
|
||||
// if we do not need synchronized responses => inform p2p that we have processed this request
|
||||
let id = if !need_wait {
|
||||
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
|
||||
None
|
||||
} else {
|
||||
Some(id)
|
||||
};
|
||||
|
||||
self.server.serve_getheaders(peer_index, message, id);
|
||||
if need_wait {
|
||||
self.server.wait_peer_requests_completed(peer_index);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
|
||||
|
@ -155,10 +160,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
}
|
||||
}
|
||||
|
||||
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) {
|
||||
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
|
||||
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_mempool(peer_index, id);
|
||||
self.server.serve_mempool(peer_index);
|
||||
}
|
||||
|
||||
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {
|
||||
|
@ -252,13 +257,14 @@ mod tests {
|
|||
}
|
||||
|
||||
impl OutboundSyncConnection for DummyOutboundSyncConnection {
|
||||
fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {}
|
||||
fn send_inventory(&self, _message: &types::Inv) {}
|
||||
fn send_getdata(&self, _message: &types::GetData) {}
|
||||
fn send_getblocks(&self, _message: &types::GetBlocks) {}
|
||||
fn send_getheaders(&self, _message: &types::GetHeaders) {}
|
||||
fn send_transaction(&self, _message: &types::Tx) {}
|
||||
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
|
||||
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
|
||||
fn send_block(&self, _message: &types::Block) {}
|
||||
fn send_headers(&self, _message: &types::Headers) {}
|
||||
fn respond_headers(&self, _message: &types::Headers, _id: u32) {}
|
||||
fn send_mempool(&self, _message: &types::MemPool) {}
|
||||
fn send_filterload(&self, _message: &types::FilterLoad) {}
|
||||
fn send_filteradd(&self, _message: &types::FilterAdd) {}
|
||||
|
@ -270,7 +276,7 @@ mod tests {
|
|||
fn send_compact_block(&self, _message: &types::CompactBlock) {}
|
||||
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
|
||||
fn send_block_txn(&self, _message: &types::BlockTxn) {}
|
||||
fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {}
|
||||
fn send_notfound(&self, _message: &types::NotFound) {}
|
||||
fn ignored(&self, _id: u32) {}
|
||||
}
|
||||
|
||||
|
@ -312,10 +318,9 @@ mod tests {
|
|||
hash: genesis_block_hash.clone(),
|
||||
}
|
||||
];
|
||||
let dummy_id = 0;
|
||||
local_node.on_peer_getdata(peer_index, types::GetData {
|
||||
inventory: inventory.clone()
|
||||
}, dummy_id);
|
||||
});
|
||||
// => `getdata` is served
|
||||
let tasks = server.take_tasks();
|
||||
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);
|
||||
|
|
|
@ -127,8 +127,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
assert_eq!(id.raw(), None);
|
||||
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
|
||||
connection.send_block(&block_message, id.raw(), id.is_final());
|
||||
connection.send_block(&block_message);
|
||||
}
|
||||
},
|
||||
Task::SendNotFound(peer_index, unknown_inventory, id) => {
|
||||
|
@ -137,8 +138,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
assert_eq!(id.raw(), None);
|
||||
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
|
||||
connection.send_notfound(¬found, id.raw(), id.is_final());
|
||||
connection.send_notfound(¬found);
|
||||
}
|
||||
},
|
||||
Task::SendInventory(peer_index, inventory, id) => {
|
||||
|
@ -147,8 +149,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
assert_eq!(id.raw(), None);
|
||||
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
|
||||
connection.send_inventory(&inventory, id.raw(), id.is_final());
|
||||
connection.send_inventory(&inventory);
|
||||
}
|
||||
},
|
||||
Task::SendHeaders(peer_index, headers, id) => {
|
||||
|
@ -158,7 +161,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
|
||||
connection.send_headers(&headers, id.raw(), id.is_final());
|
||||
match id.raw() {
|
||||
Some(id) => connection.respond_headers(&headers, id),
|
||||
None => connection.send_headers(&headers),
|
||||
}
|
||||
}
|
||||
},
|
||||
Task::Ignore(peer_index, id) => {
|
||||
|
|
|
@ -14,20 +14,10 @@ use message::types;
|
|||
|
||||
/// Synchronization requests server trait
|
||||
pub trait Server : Send + 'static {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32);
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32);
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32);
|
||||
fn serve_mempool(&self, peer_index: usize, id: u32);
|
||||
fn wait_peer_requests_completed(&self, peer_index: usize);
|
||||
}
|
||||
|
||||
/// Peer requests waiter
|
||||
#[derive(Default)]
|
||||
pub struct PeerRequestsWaiter {
|
||||
/// Awake mutex
|
||||
peer_requests_lock: Mutex<bool>,
|
||||
/// Awake event
|
||||
peer_requests_done: Condvar,
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>);
|
||||
fn serve_mempool(&self, peer_index: usize);
|
||||
}
|
||||
|
||||
/// Synchronization requests server
|
||||
|
@ -43,29 +33,32 @@ struct ServerQueue {
|
|||
queue_ready: Arc<Condvar>,
|
||||
peers_queue: VecDeque<usize>,
|
||||
tasks_queue: HashMap<usize, VecDeque<IndexedServerTask>>,
|
||||
peer_waiters: HashMap<usize, Arc<PeerRequestsWaiter>>,
|
||||
}
|
||||
|
||||
/// `ServerTask` index.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ServerTaskIndex {
|
||||
/// `None` is used when response is sent out-of-order
|
||||
None,
|
||||
/// `Partial` is used when server needs to send more than one response for request.
|
||||
Partial(u32),
|
||||
_Partial(u32),
|
||||
/// `Final` task task can be preceded by many `Partial` tasks with the same id.
|
||||
Final(u32),
|
||||
}
|
||||
|
||||
impl ServerTaskIndex {
|
||||
pub fn raw(&self) -> u32 {
|
||||
pub fn raw(&self) -> Option<u32> {
|
||||
match *self {
|
||||
ServerTaskIndex::Partial(id) | ServerTaskIndex::Final(id) => id,
|
||||
ServerTaskIndex::None => None,
|
||||
ServerTaskIndex::_Partial(id) | ServerTaskIndex::Final(id) => Some(id),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_final(&self) -> bool {
|
||||
pub fn _is_final(&self) -> bool {
|
||||
match *self {
|
||||
ServerTaskIndex::Partial(_) => false,
|
||||
ServerTaskIndex::_Partial(_) => false,
|
||||
ServerTaskIndex::Final(_) => true,
|
||||
ServerTaskIndex::None => panic!("check with raw() before"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -157,7 +150,7 @@ impl SynchronizationServer {
|
|||
ServerTask::ServeGetData(inventory) => {
|
||||
let mut unknown_items: Vec<InventoryVector> = Vec::new();
|
||||
let mut new_tasks: Vec<IndexedServerTask> = Vec::new();
|
||||
let task_id = indexed_task.id.raw();
|
||||
assert_eq!(indexed_task.id.raw(), None);
|
||||
{
|
||||
let chain = chain.read();
|
||||
let storage = chain.storage();
|
||||
|
@ -166,7 +159,7 @@ impl SynchronizationServer {
|
|||
InventoryType::MessageBlock => {
|
||||
match storage.block_number(&item.hash) {
|
||||
Some(_) => {
|
||||
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id));
|
||||
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::None);
|
||||
new_tasks.push(task);
|
||||
},
|
||||
None => unknown_items.push(item),
|
||||
|
@ -179,19 +172,13 @@ impl SynchronizationServer {
|
|||
// respond with `notfound` message for unknown data
|
||||
if !unknown_items.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
|
||||
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id));
|
||||
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::None);
|
||||
new_tasks.push(task);
|
||||
}
|
||||
// schedule data responses
|
||||
if !new_tasks.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
|
||||
// mark last task as the final one
|
||||
if let Some(task) = new_tasks.last_mut() {
|
||||
task.id = ServerTaskIndex::Final(task_id);
|
||||
}
|
||||
queue.lock().add_tasks(peer_index, new_tasks);
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, task_id));
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
|
@ -207,7 +194,7 @@ impl SynchronizationServer {
|
|||
}).collect();
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
|
@ -220,8 +207,8 @@ impl SynchronizationServer {
|
|||
if !blocks_headers.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
|
||||
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
} else if let Some(response_id) = indexed_task.id.raw() {
|
||||
executor.lock().execute(Task::Ignore(peer_index, response_id));
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
|
@ -240,7 +227,7 @@ impl SynchronizationServer {
|
|||
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
|
@ -261,7 +248,8 @@ impl SynchronizationServer {
|
|||
},
|
||||
// ignore
|
||||
ServerTask::Ignore => {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task");
|
||||
executor.lock().execute(Task::Ignore(peer_index, response_id));
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
}
|
||||
|
@ -347,48 +335,41 @@ impl Drop for SynchronizationServer {
|
|||
}
|
||||
|
||||
impl Server for SynchronizationServer {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id));
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None);
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None);
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) {
|
||||
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
|
||||
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id));
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index);
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
|
||||
if let Some(id) = id {
|
||||
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_mempool(&self, peer_index: usize, id: u32) {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id));
|
||||
fn serve_mempool(&self, peer_index: usize) {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None);
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
|
||||
fn wait_peer_requests_completed(&self, peer_index: usize) {
|
||||
if let Some(waiter) = {
|
||||
let mut queue = self.queue.lock();
|
||||
queue.get_peer_requests_waiter(peer_index)
|
||||
} {
|
||||
waiter.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerQueue {
|
||||
|
@ -398,7 +379,6 @@ impl ServerQueue {
|
|||
queue_ready: queue_ready,
|
||||
peers_queue: VecDeque::new(),
|
||||
tasks_queue: HashMap::new(),
|
||||
peer_waiters: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -425,11 +405,6 @@ impl ServerQueue {
|
|||
return;
|
||||
}
|
||||
tasks_entry.remove_entry();
|
||||
|
||||
if let Entry::Occupied(entry) = self.peer_waiters.entry(peer_index) {
|
||||
entry.get().awake();
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -470,42 +445,6 @@ impl ServerQueue {
|
|||
}
|
||||
self.queue_ready.notify_one();
|
||||
}
|
||||
|
||||
pub fn get_peer_requests_waiter(&mut self, peer_index: usize) -> Option<Arc<PeerRequestsWaiter>> {
|
||||
match self.peer_waiters.entry(peer_index) {
|
||||
Entry::Vacant(entry) => {
|
||||
// there are no pending tasks for this peer
|
||||
if !self.tasks_queue.contains_key(&peer_index) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// there are tasks => wait for completion
|
||||
let waiter = Arc::new(PeerRequestsWaiter::default());
|
||||
entry.insert(waiter.clone());
|
||||
Some(waiter)
|
||||
},
|
||||
Entry::Occupied(entry) => {
|
||||
Some(entry.get().clone())
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PeerRequestsWaiter {
|
||||
pub fn wait(&self) {
|
||||
let mut locker = self.peer_requests_lock.lock();
|
||||
if *locker {
|
||||
return;
|
||||
}
|
||||
|
||||
self.peer_requests_done.wait(&mut locker);
|
||||
}
|
||||
|
||||
pub fn awake(&self) {
|
||||
let mut locker = self.peer_requests_lock.lock();
|
||||
*locker = true;
|
||||
self.peer_requests_done.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -541,30 +480,27 @@ pub mod tests {
|
|||
}
|
||||
|
||||
impl Server for DummyServer {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
|
||||
}
|
||||
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) {
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
}
|
||||
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) {
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option<u32>) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
}
|
||||
|
||||
fn serve_mempool(&self, peer_index: usize, _id: u32) {
|
||||
fn serve_mempool(&self, peer_index: usize) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
|
||||
}
|
||||
|
||||
fn wait_peer_requests_completed(&self, _peer_index: usize) {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_synchronization_server() -> (Arc<RwLock<Chain>>, Arc<Mutex<DummyTaskExecutor>>, SynchronizationServer) {
|
||||
|
@ -584,13 +520,12 @@ pub mod tests {
|
|||
hash: H256::default(),
|
||||
}
|
||||
];
|
||||
let dummy_id = 0;
|
||||
server.serve_getdata(0, types::GetData {
|
||||
inventory: inventory.clone(),
|
||||
}, dummy_id);
|
||||
});
|
||||
// => respond with notfound
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]);
|
||||
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -603,13 +538,12 @@ pub mod tests {
|
|||
hash: test_data::genesis().hash(),
|
||||
}
|
||||
];
|
||||
let dummy_id = 0;
|
||||
server.serve_getdata(0, types::GetData {
|
||||
inventory: inventory.clone(),
|
||||
}, dummy_id);
|
||||
});
|
||||
// => respond with block
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]);
|
||||
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -617,15 +551,14 @@ pub mod tests {
|
|||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for blocks hashes
|
||||
let genesis_block_hash = test_data::genesis().hash();
|
||||
let dummy_id = 5;
|
||||
server.serve_getblocks(0, types::GetBlocks {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![genesis_block_hash.clone()],
|
||||
hash_stop: H256::default(),
|
||||
}, dummy_id);
|
||||
});
|
||||
// => no response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
|
||||
assert_eq!(tasks, vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -633,19 +566,18 @@ pub mod tests {
|
|||
let (chain, executor, server) = create_synchronization_server();
|
||||
chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error");
|
||||
// when asking for blocks hashes
|
||||
let dummy_id = 0;
|
||||
server.serve_getblocks(0, types::GetBlocks {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![test_data::genesis().hash()],
|
||||
hash_stop: H256::default(),
|
||||
}, dummy_id);
|
||||
});
|
||||
// => responds with inventory
|
||||
let inventory = vec![InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: test_data::block_h1().hash(),
|
||||
}];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -658,7 +590,7 @@ pub mod tests {
|
|||
version: 0,
|
||||
block_locator_hashes: vec![genesis_block_hash.clone()],
|
||||
hash_stop: H256::default(),
|
||||
}, dummy_id);
|
||||
}, Some(dummy_id));
|
||||
// => no response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
|
||||
|
@ -674,7 +606,7 @@ pub mod tests {
|
|||
version: 0,
|
||||
block_locator_hashes: vec![test_data::genesis().hash()],
|
||||
hash_stop: H256::default(),
|
||||
}, dummy_id);
|
||||
}, Some(dummy_id));
|
||||
// => responds with headers
|
||||
let headers = vec![
|
||||
test_data::block_h1().block_header,
|
||||
|
@ -687,11 +619,10 @@ pub mod tests {
|
|||
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
|
||||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for memory pool transactions ids
|
||||
let dummy_id = 9;
|
||||
server.serve_mempool(0, dummy_id);
|
||||
server.serve_mempool(0);
|
||||
// => no response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
|
||||
assert_eq!(tasks, vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -702,14 +633,13 @@ pub mod tests {
|
|||
let transaction_hash = transaction.hash();
|
||||
chain.write().insert_verified_transaction(transaction);
|
||||
// when asking for memory pool transactions ids
|
||||
let dummy_id = 0;
|
||||
server.serve_mempool(0, dummy_id);
|
||||
server.serve_mempool(0);
|
||||
// => respond with inventory
|
||||
let inventory = vec![InventoryVector {
|
||||
inv_type: InventoryType::MessageTx,
|
||||
hash: transaction_hash,
|
||||
}];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue