do not saturate until there are headers verifying

This commit is contained in:
Svyatoslav Nikolsky 2019-04-03 16:37:38 +03:00
parent e947ae847c
commit f1ae363000
3 changed files with 49 additions and 14 deletions

View File

@ -187,11 +187,13 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
// in case if verification was synchronous
// => try to switch to saturated state OR execute sync tasks
if self.light_verifier.is_idle() {
let mut client = self.core.lock();
if !client.try_switch_to_saturated_state() {
client.execute_synchronization_tasks(None, None);
}
}
}
fn on_transaction(&self, peer_index: PeerIndex, transaction: IndexedTransaction) {
// block can become:

View File

@ -1028,11 +1028,9 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
fn on_headers_verification_success(&mut self, headers: Vec<IndexedBlockHeader>) {
let headers = self.find_unknown_headers(headers);
if headers.is_empty() {
return;
}
if !headers.is_empty() {
self.chain.schedule_blocks_headers(headers);
}
// switch to synchronization state
if !self.state.is_synchronizing() {

View File

@ -1,6 +1,6 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use parking_lot::Mutex;
@ -57,6 +57,8 @@ pub enum VerificationTask {
/// Synchronization verifier
pub trait Verifier : Send + Sync + 'static {
/// Returns true if there are no scheduled or currently executing tasks.
fn is_idle(&self) -> bool;
/// Verify headers
fn verify_headers(&self, peer: PeerIndex, headers: Vec<IndexedBlockHeader>);
/// Verify block
@ -67,6 +69,8 @@ pub trait Verifier : Send + Sync + 'static {
/// Asynchronous synchronization verifier
pub struct AsyncVerifier {
/// Is verification thread idle?
is_idle: Arc<AtomicBool>,
/// Verification work transmission channel.
verification_work_sender: Mutex<Sender<VerificationTask>>,
/// Verification thread.
@ -130,22 +134,44 @@ impl VerificationTask {
impl AsyncVerifier {
/// Create new async verifier
pub fn new<T: VerificationSink>(verifier: Arc<ChainVerifier>, storage: StorageRef, memory_pool: MemoryPoolRef, sink: Arc<T>, verification_params: VerificationParameters) -> Self {
let is_idle = Arc::new(AtomicBool::new(true));
let (verification_work_sender, verification_work_receiver) = channel();
AsyncVerifier {
is_idle: is_idle.clone(),
verification_work_sender: Mutex::new(verification_work_sender),
verification_worker_thread: Some(thread::Builder::new()
.name("Sync verification thread".to_string())
.spawn(move || {
let verifier = ChainVerifierWrapper::new(verifier, &storage, verification_params);
AsyncVerifier::verification_worker_proc(sink, storage, memory_pool, verifier, verification_work_receiver)
AsyncVerifier::verification_worker_proc(sink, storage, memory_pool, verifier, verification_work_receiver, is_idle)
})
.expect("Error creating sync verification thread"))
}
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc<T: VerificationSink>(sink: Arc<T>, storage: StorageRef, memory_pool: MemoryPoolRef, verifier: ChainVerifierWrapper, work_receiver: Receiver<VerificationTask>) {
while let Ok(task) = work_receiver.recv() {
fn verification_worker_proc<T: VerificationSink>(
sink: Arc<T>,
storage: StorageRef,
memory_pool: MemoryPoolRef,
verifier: ChainVerifierWrapper,
work_receiver: Receiver<VerificationTask>,
is_idle: Arc<AtomicBool>,
) {
loop {
let task = match work_receiver.try_recv() {
Ok(task) => task,
Err(TryRecvError::Empty) => {
is_idle.store(true, Ordering::SeqCst);
match work_receiver.recv() {
Ok(task) => task,
Err(_) => break,
}
},
Err(TryRecvError::Disconnected) => break,
};
is_idle.store(false, Ordering::SeqCst);
if !AsyncVerifier::execute_single_task(&sink, &storage, &memory_pool, &verifier, task) {
break;
}
@ -231,21 +257,22 @@ impl Drop for AsyncVerifier {
}
impl Verifier for AsyncVerifier {
/// Verify headers
fn is_idle(&self) -> bool {
self.is_idle.load(Ordering::SeqCst)
}
fn verify_headers(&self, peer: PeerIndex, headers: Vec<IndexedBlockHeader>) {
self.verification_work_sender.lock()
.send(VerificationTask::VerifyHeaders(peer, headers))
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
}
/// Verify block
fn verify_block(&self, block: IndexedBlock) {
self.verification_work_sender.lock()
.send(VerificationTask::VerifyBlock(block))
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
}
/// Verify transaction
fn verify_transaction(&self, height: BlockHeight, transaction: IndexedTransaction) {
self.verification_work_sender.lock()
.send(VerificationTask::VerifyTransaction(height, transaction))
@ -274,6 +301,10 @@ impl<T> SyncVerifier<T> where T: VerificationSink {
}
impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
fn is_idle(&self) -> bool {
true
}
/// Verify headers
fn verify_headers(&self, _peer: PeerIndex, _headers: Vec<IndexedBlockHeader>) {
unreachable!("SyncVerifier is used only for blocks verification")
@ -358,6 +389,10 @@ pub mod tests {
}
impl Verifier for DummyVerifier {
fn is_idle(&self) -> bool {
true
}
fn verify_headers(&self, _peer: PeerIndex, headers: Vec<IndexedBlockHeader>) {
match self.sink {
Some(ref sink) => sink.on_headers_verification_success(headers),