Added option to close a queue and wait for queued up operations to finish,

rather than just closing a queue and immediately cancelling all operations.
This commit is contained in:
Simon 2016-09-02 19:21:42 -07:00
parent c93d8bdf9c
commit 9cd713439a
2 changed files with 58 additions and 15 deletions

View File

@ -6,7 +6,7 @@
static std::atomic<size_t> workerCounter(0);
AsyncRPCQueue::AsyncRPCQueue() : closed_(false) {
AsyncRPCQueue::AsyncRPCQueue() : closed_(false), finish_(false) {
}
AsyncRPCQueue::~AsyncRPCQueue() {
@ -18,15 +18,20 @@ AsyncRPCQueue::~AsyncRPCQueue() {
*/
void AsyncRPCQueue::run(size_t workerId) {
while (!isClosed()) {
while (true) {
AsyncRPCOperationId key;
std::shared_ptr<AsyncRPCOperation> operation;
{
std::unique_lock< std::mutex > guard(lock_);
while (operation_id_queue_.empty() && !isClosed()) {
while (operation_id_queue_.empty() && !isClosed() && !isFinishing()) {
this->condition_.wait(guard);
}
// Exit if the queue is empty and we are finishing up
if ( isFinishing() && operation_id_queue_.empty() ) {
break;
}
// Exit if the queue is closing.
if (isClosed()) {
break;
@ -66,8 +71,8 @@ void AsyncRPCQueue::run(size_t workerId) {
*/
void AsyncRPCQueue::addOperation(const std::shared_ptr<AsyncRPCOperation> &ptrOperation) {
// Don't add if queue is closed
if (isClosed()) {
// Don't add if queue is closed or finishing
if (isClosed() || isFinishing()) {
return;
}
@ -110,17 +115,31 @@ std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::popOperationForId(AsyncRPCOper
* Return true if the queue is closed to new operations.
*/
bool AsyncRPCQueue::isClosed() const {
return closed_;
return closed_.load();
}
/**
* Close the queue and cancel all existing operations
*/
void AsyncRPCQueue::close() {
this->closed_ = true;
closed_.store(true);
cancelAllOperations();
}
/**
* Return true if the queue is finishing up
*/
bool AsyncRPCQueue::isFinishing() const {
return finish_.load();
}
/**
* Close the queue but finish existing operations. Do not accept new operations.
*/
void AsyncRPCQueue::finish() {
finish_.store(true);
}
/**
* Call cancel() on all operations
*/
@ -171,9 +190,28 @@ std::vector<AsyncRPCOperationId> AsyncRPCQueue::getAllOperationIds() const {
* Calling thread will close and wait for worker threads to join.
*/
void AsyncRPCQueue::closeAndWait() {
if (!this->closed_) {
close();
close();
wait_for_worker_threads();
}
/**
* Block current thread until all workers have finished their tasks.
*/
void AsyncRPCQueue::finishAndWait() {
finish();
wait_for_worker_threads();
}
/**
* Block current thread until all operations are finished or the queue has closed.
*/
void AsyncRPCQueue::wait_for_worker_threads() {
// Notify any workers who are waiting, so they see the updated queue state
{
std::unique_lock< std::mutex > guard(lock_);
this->condition_.notify_all();
}
for (std::thread & t : this->workers_) {
if (t.joinable()) {
t.join();

View File

@ -1,4 +1,4 @@
// Copyright (c) 2014 The Zcash developers
// Copyright (c) 2016 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -36,9 +36,12 @@ public:
void addWorker();
size_t getNumberOfWorkers() const;
bool isClosed() const;
void close();
void closeAndWait();
void cancelAllOperations();
bool isFinishing() const;
void close(); // close queue and cancel all operations
void finish(); // close queue but finishing existing operations
void closeAndWait(); // block thread until all threads have terminated.
void finishAndWait(); // block thread until existing operations have finished, threads terminated
void cancelAllOperations(); // mark all operations in the queue as cancelled
size_t getOperationCount() const;
std::shared_ptr<AsyncRPCOperation> getOperationForId(AsyncRPCOperationId) const;
std::shared_ptr<AsyncRPCOperation> popOperationForId(AsyncRPCOperationId);
@ -46,13 +49,15 @@ public:
std::vector<AsyncRPCOperationId> getAllOperationIds() const;
private:
// addWorker() will spawn a new thread on this method
// addWorker() will spawn a new thread on run())
void run(size_t workerId);
void wait_for_worker_threads();
// Why this is not a recursive lock: http://www.zaval.org/resources/library/butenhof1.html
mutable std::mutex lock_;
std::condition_variable condition_;
bool closed_;
std::atomic<bool> closed_;
std::atomic<bool> finish_;
AsyncRPCOperationMap operation_map_;
std::queue <AsyncRPCOperationId> operation_id_queue_;
std::vector<std::thread> workers_;