Auto merge of #2555 - jasondavies:fix-2554, r=str4d

Fix various thread assertion errors caused during shutdown.

Cherry-picked from the following upstream PRs:

- bitcoin/bitcoin#6719
- bitcoin/bitcoin#6990
- bitcoin/bitcoin#8421
  - Second commit only in this PR
- bitcoin/bitcoin#11006

I've cherry-picked the relevant commits, along with a note in each commit referring to the original Bitcoin commit ID (and the Zcash issue numbers where applicable).  I've tested each issue with/without these patches applied.

Closes #2214, #2334, and #2554.
This commit is contained in:
Homu 2017-10-23 15:15:14 -07:00
commit fb2f98e00b
4 changed files with 87 additions and 16 deletions

View File

@ -72,13 +72,35 @@ private:
std::deque<WorkItem*> queue; std::deque<WorkItem*> queue;
bool running; bool running;
size_t maxDepth; size_t maxDepth;
int numThreads;
/** RAII object to keep track of number of running worker threads */
class ThreadCounter
{
public:
WorkQueue &wq;
ThreadCounter(WorkQueue &w): wq(w)
{
boost::lock_guard<boost::mutex> lock(wq.cs);
wq.numThreads += 1;
}
~ThreadCounter()
{
boost::lock_guard<boost::mutex> lock(wq.cs);
wq.numThreads -= 1;
wq.cond.notify_all();
}
};
public: public:
WorkQueue(size_t maxDepth) : running(true), WorkQueue(size_t maxDepth) : running(true),
maxDepth(maxDepth) maxDepth(maxDepth),
numThreads(0)
{ {
} }
/* Precondition: worker threads have all stopped */ /*( Precondition: worker threads have all stopped
* (call WaitExit)
*/
~WorkQueue() ~WorkQueue()
{ {
while (!queue.empty()) { while (!queue.empty()) {
@ -100,6 +122,7 @@ public:
/** Thread function */ /** Thread function */
void Run() void Run()
{ {
ThreadCounter count(*this);
while (running) { while (running) {
WorkItem* i = 0; WorkItem* i = 0;
{ {
@ -122,6 +145,13 @@ public:
running = false; running = false;
cond.notify_all(); cond.notify_all();
} }
/** Wait for worker threads to exit */
void WaitExit()
{
boost::unique_lock<boost::mutex> lock(cs);
while (numThreads > 0)
cond.wait(lock);
}
/** Return current depth of queue */ /** Return current depth of queue */
size_t Depth() size_t Depth()
@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets;
static WorkQueue<HTTPClosure>* workQueue = 0; static WorkQueue<HTTPClosure>* workQueue = 0;
//! Handlers for (sub)paths //! Handlers for (sub)paths
std::vector<HTTPPathHandler> pathHandlers; std::vector<HTTPPathHandler> pathHandlers;
//! Bound listening sockets
std::vector<evhttp_bound_socket *> boundSockets;
/** Check if a network address is allowed to access the HTTP server */ /** Check if a network address is allowed to access the HTTP server */
static bool ClientAllowed(const CNetAddr& netaddr) static bool ClientAllowed(const CNetAddr& netaddr)
@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
} }
} }
/** Callback to reject HTTP requests after shutdown. */
static void http_reject_request_cb(struct evhttp_request* req, void*)
{
LogPrint("http", "Rejecting request while shutting down\n");
evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
}
/** Event dispatcher thread */ /** Event dispatcher thread */
static void ThreadHTTP(struct event_base* base, struct evhttp* http) static void ThreadHTTP(struct event_base* base, struct evhttp* http)
{ {
@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http)
static bool HTTPBindAddresses(struct evhttp* http) static bool HTTPBindAddresses(struct evhttp* http)
{ {
int defaultPort = GetArg("-rpcport", BaseParams().RPCPort()); int defaultPort = GetArg("-rpcport", BaseParams().RPCPort());
int nBound = 0;
std::vector<std::pair<std::string, uint16_t> > endpoints; std::vector<std::pair<std::string, uint16_t> > endpoints;
// Determine what addresses to bind to // Determine what addresses to bind to
@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http)
// Bind addresses // Bind addresses
for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) { for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) {
LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second); LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second);
if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) { evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second);
nBound += 1; if (bind_handle) {
boundSockets.push_back(bind_handle);
} else { } else {
LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second); LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second);
} }
} }
return nBound > 0; return !boundSockets.empty();
} }
/** Simple wrapper to set thread name and run work queue */ /** Simple wrapper to set thread name and run work queue */
@ -399,23 +438,33 @@ bool InitHTTPServer()
return true; return true;
} }
bool StartHTTPServer(boost::thread_group& threadGroup) boost::thread threadHTTP;
bool StartHTTPServer()
{ {
LogPrint("http", "Starting HTTP server\n"); LogPrint("http", "Starting HTTP server\n");
int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
threadGroup.create_thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP)); threadHTTP = boost::thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP));
for (int i = 0; i < rpcThreads; i++) for (int i = 0; i < rpcThreads; i++) {
threadGroup.create_thread(boost::bind(&HTTPWorkQueueRun, workQueue)); boost::thread rpc_worker(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
}
return true; return true;
} }
void InterruptHTTPServer() void InterruptHTTPServer()
{ {
LogPrint("http", "Interrupting HTTP server\n"); LogPrint("http", "Interrupting HTTP server\n");
if (eventBase) if (eventHTTP) {
event_base_loopbreak(eventBase); // Unlisten sockets
BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL);
}
if (workQueue) if (workQueue)
workQueue->Interrupt(); workQueue->Interrupt();
} }
@ -423,7 +472,27 @@ void InterruptHTTPServer()
void StopHTTPServer() void StopHTTPServer()
{ {
LogPrint("http", "Stopping HTTP server\n"); LogPrint("http", "Stopping HTTP server\n");
delete workQueue; if (workQueue) {
LogPrint("http", "Waiting for HTTP worker threads to exit\n");
workQueue->WaitExit();
delete workQueue;
}
if (eventBase) {
LogPrint("http", "Waiting for HTTP event thread to exit\n");
// Exit the event loop as soon as there are no active events.
event_base_loopexit(eventBase, nullptr);
// Give event loop a few seconds to exit (to send back last RPC responses), then break it
// Before this was solved with event_base_loopexit, but that didn't work as expected in
// at least libevent 2.0.21 and always introduced a delay. In libevent
// master that appears to be solved, so in the future that solution
// could be used again (if desirable).
// (see discussion in https://github.com/bitcoin/bitcoin/pull/6990)
if (!threadHTTP.try_join_for(boost::chrono::milliseconds(2000))) {
LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n");
event_base_loopbreak(eventBase);
threadHTTP.join();
}
}
if (eventHTTP) { if (eventHTTP) {
evhttp_free(eventHTTP); evhttp_free(eventHTTP);
eventHTTP = 0; eventHTTP = 0;
@ -432,6 +501,7 @@ void StopHTTPServer()
event_base_free(eventBase); event_base_free(eventBase);
eventBase = 0; eventBase = 0;
} }
LogPrint("http", "Stopped HTTP server\n");
} }
struct event_base* EventBase() struct event_base* EventBase()

View File

@ -28,7 +28,7 @@ bool InitHTTPServer();
* This is separate from InitHTTPServer to give users race-condition-free time * This is separate from InitHTTPServer to give users race-condition-free time
* to register their handlers between InitHTTPServer and StartHTTPServer. * to register their handlers between InitHTTPServer and StartHTTPServer.
*/ */
bool StartHTTPServer(boost::thread_group& threadGroup); bool StartHTTPServer();
/** Interrupt HTTP server threads */ /** Interrupt HTTP server threads */
void InterruptHTTPServer(); void InterruptHTTPServer();
/** Stop HTTP server */ /** Stop HTTP server */

View File

@ -714,7 +714,7 @@ bool AppInitServers(boost::thread_group& threadGroup)
return false; return false;
if (GetBoolArg("-rest", false) && !StartREST()) if (GetBoolArg("-rest", false) && !StartREST())
return false; return false;
if (!StartHTTPServer(threadGroup)) if (!StartHTTPServer())
return false; return false;
return true; return true;
} }

View File

@ -246,7 +246,8 @@ UniValue stop(const UniValue& params, bool fHelp)
throw runtime_error( throw runtime_error(
"stop\n" "stop\n"
"\nStop Zcash server."); "\nStop Zcash server.");
// Shutdown will take long enough that the response should get back // Event loop will exit after current HTTP requests have been handled, so
// this reply will get back to the client.
StartShutdown(); StartShutdown();
return "Zcash server stopping"; return "Zcash server stopping";
} }