hash incoming headers without holding sync lock

This commit is contained in:
Svyatoslav Nikolsky 2019-04-05 12:05:09 +03:00
parent 0f6ff605d7
commit 92b0774cc6
4 changed files with 14 additions and 16 deletions

View File

@ -1,4 +1,4 @@
use chain::{IndexedTransaction, IndexedBlock}; use chain::{IndexedTransaction, IndexedBlock, IndexedBlockHeader};
use message::types; use message::types;
use p2p::{InboundSyncConnection, InboundSyncConnectionRef, InboundSyncConnectionStateRef}; use p2p::{InboundSyncConnection, InboundSyncConnectionRef, InboundSyncConnectionStateRef};
use types::{PeersRef, LocalNodeRef, PeerIndex, RequestId}; use types::{PeersRef, LocalNodeRef, PeerIndex, RequestId};
@ -103,7 +103,8 @@ impl InboundSyncConnection for InboundConnection {
return; return;
} }
self.node.on_headers(self.peer_index, message); let headers = message.headers.into_iter().map(IndexedBlockHeader::from_raw).collect();
self.node.on_headers(self.peer_index, headers);
} }
fn on_mempool(&self, message: types::MemPool) { fn on_mempool(&self, message: types::MemPool) {

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use parking_lot::{Mutex, Condvar}; use parking_lot::{Mutex, Condvar};
use time; use time;
use futures::{lazy, finished}; use futures::{lazy, finished};
use chain::{IndexedTransaction, IndexedBlock}; use chain::{IndexedTransaction, IndexedBlock, IndexedBlockHeader};
use keys::Address; use keys::Address;
use message::types; use message::types;
use miner::BlockAssembler; use miner::BlockAssembler;
@ -94,9 +94,9 @@ impl<U, V> LocalNode<U, V> where U: Server, V: Client {
} }
/// When headers message is received /// When headers message is received
pub fn on_headers(&self, peer_index: PeerIndex, message: types::Headers) { pub fn on_headers(&self, peer_index: PeerIndex, headers: Vec<IndexedBlockHeader>) {
trace!(target: "sync", "Got `headers` message from peer#{}. Headers len: {}", peer_index, message.headers.len()); trace!(target: "sync", "Got `headers` message from peer#{}. Headers len: {}", peer_index, headers.len());
self.client.on_headers(peer_index, message); self.client.on_headers(peer_index, headers);
} }
/// When transaction is received /// When transaction is received

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use parking_lot::Mutex; use parking_lot::Mutex;
use chain::{IndexedTransaction, IndexedBlock}; use chain::{IndexedTransaction, IndexedBlock, IndexedBlockHeader};
use message::types; use message::types;
use synchronization_executor::TaskExecutor; use synchronization_executor::TaskExecutor;
use synchronization_verifier::{Verifier, TransactionVerificationSink}; use synchronization_verifier::{Verifier, TransactionVerificationSink};
@ -124,7 +124,7 @@ pub trait Client : Send + Sync + 'static {
fn on_connect(&self, peer_index: PeerIndex); fn on_connect(&self, peer_index: PeerIndex);
fn on_disconnect(&self, peer_index: PeerIndex); fn on_disconnect(&self, peer_index: PeerIndex);
fn on_inventory(&self, peer_index: PeerIndex, message: types::Inv); fn on_inventory(&self, peer_index: PeerIndex, message: types::Inv);
fn on_headers(&self, peer_index: PeerIndex, message: types::Headers); fn on_headers(&self, peer_index: PeerIndex, headers: Vec<IndexedBlockHeader>);
fn on_block(&self, peer_index: PeerIndex, block: IndexedBlock); fn on_block(&self, peer_index: PeerIndex, block: IndexedBlock);
fn on_transaction(&self, peer_index: PeerIndex, transaction: IndexedTransaction); fn on_transaction(&self, peer_index: PeerIndex, transaction: IndexedTransaction);
fn on_notfound(&self, peer_index: PeerIndex, message: types::NotFound); fn on_notfound(&self, peer_index: PeerIndex, message: types::NotFound);
@ -160,8 +160,8 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().on_inventory(peer_index, message); self.core.lock().on_inventory(peer_index, message);
} }
fn on_headers(&self, peer_index: PeerIndex, message: types::Headers) { fn on_headers(&self, peer_index: PeerIndex, headers: Vec<IndexedBlockHeader>) {
let headers_to_verify = self.core.lock().on_headers(peer_index, message); let headers_to_verify = self.core.lock().on_headers(peer_index, headers);
if let Some(headers_to_verify) = headers_to_verify { if let Some(headers_to_verify) = headers_to_verify {
self.light_verifier.verify_headers(peer_index, headers_to_verify); self.light_verifier.verify_headers(peer_index, headers_to_verify);
} }

View File

@ -72,7 +72,7 @@ pub trait ClientCore {
fn on_connect(&mut self, peer_index: PeerIndex); fn on_connect(&mut self, peer_index: PeerIndex);
fn on_disconnect(&mut self, peer_index: PeerIndex); fn on_disconnect(&mut self, peer_index: PeerIndex);
fn on_inventory(&self, peer_index: PeerIndex, message: types::Inv); fn on_inventory(&self, peer_index: PeerIndex, message: types::Inv);
fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) -> Option<Vec<IndexedBlockHeader>>; fn on_headers(&mut self, peer_index: PeerIndex, headers: Vec<IndexedBlockHeader>) -> Option<Vec<IndexedBlockHeader>>;
fn on_block(&mut self, peer_index: PeerIndex, block: IndexedBlock) -> Option<VecDeque<PartiallyVerifiedBlock>>; fn on_block(&mut self, peer_index: PeerIndex, block: IndexedBlock) -> Option<VecDeque<PartiallyVerifiedBlock>>;
fn on_transaction(&mut self, peer_index: PeerIndex, transaction: IndexedTransaction) -> Option<VecDeque<IndexedTransaction>>; fn on_transaction(&mut self, peer_index: PeerIndex, transaction: IndexedTransaction) -> Option<VecDeque<IndexedTransaction>>;
fn on_notfound(&mut self, peer_index: PeerIndex, message: types::NotFound); fn on_notfound(&mut self, peer_index: PeerIndex, message: types::NotFound);
@ -254,11 +254,8 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
} }
/// Try to queue synchronization of unknown blocks when blocks headers are received. /// Try to queue synchronization of unknown blocks when blocks headers are received.
fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) -> Option<Vec<IndexedBlockHeader>> { fn on_headers(&mut self, peer_index: PeerIndex, headers: Vec<IndexedBlockHeader>) -> Option<Vec<IndexedBlockHeader>> {
assert!(!message.headers.is_empty(), "This must be checked in incoming connection"); assert!(! headers.is_empty(), "This must be checked in incoming connection");
// transform to indexed headers
let headers: Vec<_> = message.headers.into_iter().map(IndexedBlockHeader::from_raw).collect();
// update peers to select next tasks // update peers to select next tasks
self.peers_tasks.on_headers_received(peer_index); self.peers_tasks.on_headers_received(peer_index);