From f1ae3630004795d38ba2ec77ceeff64f014583e5 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 3 Apr 2019 16:37:38 +0300 Subject: [PATCH] do not saturate until there are headers verifying --- sync/src/synchronization_client.rs | 8 ++-- sync/src/synchronization_client_core.rs | 6 +-- sync/src/synchronization_verifier.rs | 49 +++++++++++++++++++++---- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 6ad1184b..05350d1d 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -187,9 +187,11 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri // in case if verification was synchronous // => try to switch to saturated state OR execute sync tasks - let mut client = self.core.lock(); - if !client.try_switch_to_saturated_state() { - client.execute_synchronization_tasks(None, None); + if self.light_verifier.is_idle() { + let mut client = self.core.lock(); + if !client.try_switch_to_saturated_state() { + client.execute_synchronization_tasks(None, None); + } } } diff --git a/sync/src/synchronization_client_core.rs b/sync/src/synchronization_client_core.rs index cf63382b..36e72e27 100644 --- a/sync/src/synchronization_client_core.rs +++ b/sync/src/synchronization_client_core.rs @@ -1028,12 +1028,10 @@ impl SynchronizationClientCore where T: TaskExecutor { fn on_headers_verification_success(&mut self, headers: Vec) { let headers = self.find_unknown_headers(headers); - if headers.is_empty() { - return; + if !headers.is_empty() { + self.chain.schedule_blocks_headers(headers); } - self.chain.schedule_blocks_headers(headers); - // switch to synchronization state if !self.state.is_synchronizing() { if self.chain.length_of_blocks_state(BlockState::Scheduled) + diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index b951ad54..6bbc1a1f 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; use std::sync::Arc; -use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use parking_lot::Mutex; @@ -57,6 +57,8 @@ pub enum VerificationTask { /// Synchronization verifier pub trait Verifier : Send + Sync + 'static { + /// Returns true if there are no scheduled or currently executing tasks. + fn is_idle(&self) -> bool; /// Verify headers fn verify_headers(&self, peer: PeerIndex, headers: Vec); /// Verify block @@ -67,6 +69,8 @@ pub trait Verifier : Send + Sync + 'static { /// Asynchronous synchronization verifier pub struct AsyncVerifier { + /// Is verification thread idle? + is_idle: Arc, /// Verification work transmission channel. verification_work_sender: Mutex>, /// Verification thread. @@ -130,22 +134,44 @@ impl VerificationTask { impl AsyncVerifier { /// Create new async verifier pub fn new(verifier: Arc, storage: StorageRef, memory_pool: MemoryPoolRef, sink: Arc, verification_params: VerificationParameters) -> Self { + let is_idle = Arc::new(AtomicBool::new(true)); let (verification_work_sender, verification_work_receiver) = channel(); AsyncVerifier { + is_idle: is_idle.clone(), verification_work_sender: Mutex::new(verification_work_sender), verification_worker_thread: Some(thread::Builder::new() .name("Sync verification thread".to_string()) .spawn(move || { let verifier = ChainVerifierWrapper::new(verifier, &storage, verification_params); - AsyncVerifier::verification_worker_proc(sink, storage, memory_pool, verifier, verification_work_receiver) + AsyncVerifier::verification_worker_proc(sink, storage, memory_pool, verifier, verification_work_receiver, is_idle) }) .expect("Error creating sync verification thread")) } } /// Thread procedure for handling verification tasks - fn verification_worker_proc(sink: Arc, storage: StorageRef, memory_pool: MemoryPoolRef, verifier: ChainVerifierWrapper, work_receiver: Receiver) { - while let Ok(task) = work_receiver.recv() { + fn verification_worker_proc( + sink: Arc, + storage: StorageRef, + memory_pool: MemoryPoolRef, + verifier: ChainVerifierWrapper, + work_receiver: Receiver, + is_idle: Arc, + ) { + loop { + let task = match work_receiver.try_recv() { + Ok(task) => task, + Err(TryRecvError::Empty) => { + is_idle.store(true, Ordering::SeqCst); + match work_receiver.recv() { + Ok(task) => task, + Err(_) => break, + } + }, + Err(TryRecvError::Disconnected) => break, + }; + + is_idle.store(false, Ordering::SeqCst); if !AsyncVerifier::execute_single_task(&sink, &storage, &memory_pool, &verifier, task) { break; } @@ -231,21 +257,22 @@ impl Drop for AsyncVerifier { } impl Verifier for AsyncVerifier { - /// Verify headers + fn is_idle(&self) -> bool { + self.is_idle.load(Ordering::SeqCst) + } + fn verify_headers(&self, peer: PeerIndex, headers: Vec) { self.verification_work_sender.lock() .send(VerificationTask::VerifyHeaders(peer, headers)) .expect("Verification thread have the same lifetime as `AsyncVerifier`"); } - /// Verify block fn verify_block(&self, block: IndexedBlock) { self.verification_work_sender.lock() .send(VerificationTask::VerifyBlock(block)) .expect("Verification thread have the same lifetime as `AsyncVerifier`"); } - /// Verify transaction fn verify_transaction(&self, height: BlockHeight, transaction: IndexedTransaction) { self.verification_work_sender.lock() .send(VerificationTask::VerifyTransaction(height, transaction)) @@ -274,6 +301,10 @@ impl SyncVerifier where T: VerificationSink { } impl Verifier for SyncVerifier where T: VerificationSink { + fn is_idle(&self) -> bool { + true + } + /// Verify headers fn verify_headers(&self, _peer: PeerIndex, _headers: Vec) { unreachable!("SyncVerifier is used only for blocks verification") @@ -358,6 +389,10 @@ pub mod tests { } impl Verifier for DummyVerifier { + fn is_idle(&self) -> bool { + true + } + fn verify_headers(&self, _peer: PeerIndex, headers: Vec) { match self.sink { Some(ref sink) => sink.on_headers_verification_success(headers),