From 9cd713439adc4f7489c3c6f1cfcabd9a963c9285 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 2 Sep 2016 19:21:42 -0700 Subject: [PATCH] Added option to close a queue and wait for queued up operations to finish, rather than just closing a queue and immediately cancelling all operations. --- src/asyncrpcqueue.cpp | 56 ++++++++++++++++++++++++++++++++++++------- src/asyncrpcqueue.h | 17 ++++++++----- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/asyncrpcqueue.cpp b/src/asyncrpcqueue.cpp index 5bc96668..a63619f0 100644 --- a/src/asyncrpcqueue.cpp +++ b/src/asyncrpcqueue.cpp @@ -6,7 +6,7 @@ static std::atomic workerCounter(0); -AsyncRPCQueue::AsyncRPCQueue() : closed_(false) { +AsyncRPCQueue::AsyncRPCQueue() : closed_(false), finish_(false) { } AsyncRPCQueue::~AsyncRPCQueue() { @@ -18,15 +18,20 @@ AsyncRPCQueue::~AsyncRPCQueue() { */ void AsyncRPCQueue::run(size_t workerId) { - while (!isClosed()) { + while (true) { AsyncRPCOperationId key; std::shared_ptr operation; { std::unique_lock< std::mutex > guard(lock_); - while (operation_id_queue_.empty() && !isClosed()) { + while (operation_id_queue_.empty() && !isClosed() && !isFinishing()) { this->condition_.wait(guard); } + // Exit if the queue is empty and we are finishing up + if ( isFinishing() && operation_id_queue_.empty() ) { + break; + } + // Exit if the queue is closing. if (isClosed()) { break; @@ -66,8 +71,8 @@ void AsyncRPCQueue::run(size_t workerId) { */ void AsyncRPCQueue::addOperation(const std::shared_ptr &ptrOperation) { - // Don't add if queue is closed - if (isClosed()) { + // Don't add if queue is closed or finishing + if (isClosed() || isFinishing()) { return; } @@ -110,17 +115,31 @@ std::shared_ptr AsyncRPCQueue::popOperationForId(AsyncRPCOper * Return true if the queue is closed to new operations. */ bool AsyncRPCQueue::isClosed() const { - return closed_; + return closed_.load(); } /** * Close the queue and cancel all existing operations */ void AsyncRPCQueue::close() { - this->closed_ = true; + closed_.store(true); cancelAllOperations(); } +/** + * Return true if the queue is finishing up + */ +bool AsyncRPCQueue::isFinishing() const { + return finish_.load(); +} + +/** + * Close the queue but finish existing operations. Do not accept new operations. + */ +void AsyncRPCQueue::finish() { + finish_.store(true); +} + /** * Call cancel() on all operations */ @@ -171,9 +190,28 @@ std::vector AsyncRPCQueue::getAllOperationIds() const { * Calling thread will close and wait for worker threads to join. */ void AsyncRPCQueue::closeAndWait() { - if (!this->closed_) { - close(); + close(); + wait_for_worker_threads(); +} + +/** + * Block current thread until all workers have finished their tasks. + */ +void AsyncRPCQueue::finishAndWait() { + finish(); + wait_for_worker_threads(); +} + +/** + * Block current thread until all operations are finished or the queue has closed. + */ +void AsyncRPCQueue::wait_for_worker_threads() { + // Notify any workers who are waiting, so they see the updated queue state + { + std::unique_lock< std::mutex > guard(lock_); + this->condition_.notify_all(); } + for (std::thread & t : this->workers_) { if (t.joinable()) { t.join(); diff --git a/src/asyncrpcqueue.h b/src/asyncrpcqueue.h index 3de7b5ba..837d4609 100644 --- a/src/asyncrpcqueue.h +++ b/src/asyncrpcqueue.h @@ -1,4 +1,4 @@ -// Copyright (c) 2014 The Zcash developers +// Copyright (c) 2016 The Zcash developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -36,9 +36,12 @@ public: void addWorker(); size_t getNumberOfWorkers() const; bool isClosed() const; - void close(); - void closeAndWait(); - void cancelAllOperations(); + bool isFinishing() const; + void close(); // close queue and cancel all operations + void finish(); // close queue but finishing existing operations + void closeAndWait(); // block thread until all threads have terminated. + void finishAndWait(); // block thread until existing operations have finished, threads terminated + void cancelAllOperations(); // mark all operations in the queue as cancelled size_t getOperationCount() const; std::shared_ptr getOperationForId(AsyncRPCOperationId) const; std::shared_ptr popOperationForId(AsyncRPCOperationId); @@ -46,13 +49,15 @@ public: std::vector getAllOperationIds() const; private: - // addWorker() will spawn a new thread on this method + // addWorker() will spawn a new thread on run()) void run(size_t workerId); + void wait_for_worker_threads(); // Why this is not a recursive lock: http://www.zaval.org/resources/library/butenhof1.html mutable std::mutex lock_; std::condition_variable condition_; - bool closed_; + std::atomic closed_; + std::atomic finish_; AsyncRPCOperationMap operation_map_; std::queue operation_id_queue_; std::vector workers_;