Auto merge of #4752 - str4d:small-httpserver-backports, r=daira

Small httpserver.cpp backports

Also includes a change to the `uiInterface.NotifyBlockTip` signal API.
These remove merge conflicts from subsequent backports for `sync.h`.

Cherry-picked from the following upstream PRs:
- bitcoin/bitcoin#6859
- bitcoin/bitcoin#7112
  - Only the non-QT changes.
- bitcoin/bitcoin#7966
- bitcoin/bitcoin#8421
  - We already backported the second commit in zcash/zcash#2555
This commit is contained in:
Homu 2020-10-01 12:48:58 +00:00
commit a983344931
7 changed files with 69 additions and 48 deletions

View File

@ -10,7 +10,7 @@
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, start_nodes, str_to_b64str
from http.client import HTTPConnection
from http.client import HTTPConnection, BAD_REQUEST, NOT_FOUND
from urllib.parse import urlparse
class HTTPBasicsTest (BitcoinTestFramework):
@ -90,5 +90,19 @@ class HTTPBasicsTest (BitcoinTestFramework):
assert_equal(b'"error":null' in out1, True)
assert_equal(conn.sock!=None, True) # connection must be closed because bitcoind should use keep-alive by default
# Check excessive request size
conn = HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
conn.request('GET', '/' + ('x'*1000), '', headers)
out1 = conn.getresponse()
assert_equal(out1.status, NOT_FOUND)
conn = HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
conn.request('GET', '/' + ('x'*10000), '', headers)
out1 = conn.getresponse()
assert_equal(out1.status, BAD_REQUEST)
if __name__ == '__main__':
HTTPBasicsTest().main()

View File

@ -19,6 +19,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <future>
#include <event2/event.h>
#include <event2/http.h>
@ -34,16 +35,15 @@
#endif
#endif
#include <boost/algorithm/string/case_conv.hpp> // for to_lower()
#include <boost/foreach.hpp>
#include <boost/scoped_ptr.hpp>
/** Maximum size of http request (request line + headers) */
static const size_t MAX_HEADERS_SIZE = 8192;
/** HTTP request work item */
class HTTPWorkItem : public HTTPClosure
{
public:
HTTPWorkItem(HTTPRequest* req, const std::string &path, const HTTPRequestHandler& func):
req(req), path(path), func(func)
HTTPWorkItem(std::unique_ptr<HTTPRequest> req, const std::string &path, const HTTPRequestHandler& func):
req(std::move(req)), path(path), func(func)
{
}
void operator()()
@ -51,7 +51,7 @@ public:
func(req.get(), path);
}
boost::scoped_ptr<HTTPRequest> req;
std::unique_ptr<HTTPRequest> req;
private:
std::string path;
@ -66,10 +66,9 @@ class WorkQueue
{
private:
/** Mutex protects entire object */
CWaitableCriticalSection cs;
CConditionVariable cond;
/* XXX in C++11 we can use std::unique_ptr here and avoid manual cleanup */
std::deque<WorkItem*> queue;
std::mutex cs;
std::condition_variable cond;
std::deque<std::unique_ptr<WorkItem>> queue;
bool running;
size_t maxDepth;
int numThreads;
@ -81,12 +80,12 @@ private:
WorkQueue &wq;
ThreadCounter(WorkQueue &w): wq(w)
{
boost::lock_guard<boost::mutex> lock(wq.cs);
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads += 1;
}
~ThreadCounter()
{
boost::lock_guard<boost::mutex> lock(wq.cs);
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads -= 1;
wq.cond.notify_all();
}
@ -98,24 +97,20 @@ public:
numThreads(0)
{
}
/*( Precondition: worker threads have all stopped
/** Precondition: worker threads have all stopped
* (call WaitExit)
*/
~WorkQueue()
{
while (!queue.empty()) {
delete queue.front();
queue.pop_front();
}
}
/** Enqueue a work item */
bool Enqueue(WorkItem* item)
{
boost::unique_lock<boost::mutex> lock(cs);
std::unique_lock<std::mutex> lock(cs);
if (queue.size() >= maxDepth) {
return false;
}
queue.push_back(item);
queue.emplace_back(std::unique_ptr<WorkItem>(item));
cond.notify_one();
return true;
}
@ -124,31 +119,30 @@ public:
{
ThreadCounter count(*this);
while (running) {
WorkItem* i = 0;
std::unique_ptr<WorkItem> i;
{
boost::unique_lock<boost::mutex> lock(cs);
std::unique_lock<std::mutex> lock(cs);
while (running && queue.empty())
cond.wait(lock);
if (!running)
break;
i = queue.front();
i = std::move(queue.front());
queue.pop_front();
}
(*i)();
delete i;
}
}
/** Interrupt and exit loops */
void Interrupt()
{
boost::unique_lock<boost::mutex> lock(cs);
std::unique_lock<std::mutex> lock(cs);
running = false;
cond.notify_all();
}
/** Wait for worker threads to exit */
void WaitExit()
{
boost::unique_lock<boost::mutex> lock(cs);
std::unique_lock<std::mutex> lock(cs);
while (numThreads > 0)
cond.wait(lock);
}
@ -156,7 +150,7 @@ public:
/** Return current depth of queue */
size_t Depth()
{
boost::unique_lock<boost::mutex> lock(cs);
std::unique_lock<std::mutex> lock(cs);
return queue.size();
}
};
@ -193,9 +187,10 @@ static bool ClientAllowed(const CNetAddr& netaddr)
{
if (!netaddr.IsValid())
return false;
BOOST_FOREACH (const CSubNet& subnet, rpc_allow_subnets)
for (const CSubNet& subnet : rpc_allow_subnets) {
if (subnet.Match(netaddr))
return true;
}
return false;
}
@ -207,7 +202,7 @@ static bool InitHTTPAllowList()
rpc_allow_subnets.push_back(CSubNet("::1")); // always allow IPv6 localhost
if (mapMultiArgs.count("-rpcallowip")) {
const std::vector<std::string>& vAllow = mapMultiArgs["-rpcallowip"];
BOOST_FOREACH (std::string strAllow, vAllow) {
for (std::string strAllow : vAllow) {
CSubNet subnet(strAllow);
if (!subnet.IsValid()) {
uiInterface.ThreadSafeMessageBox(
@ -219,8 +214,9 @@ static bool InitHTTPAllowList()
}
}
std::string strAllowed;
BOOST_FOREACH (const CSubNet& subnet, rpc_allow_subnets)
for (const CSubNet& subnet : rpc_allow_subnets) {
strAllowed += subnet.ToString() + " ";
}
LogPrint("http", "Allowing HTTP connections from: %s\n", strAllowed);
return true;
}
@ -285,12 +281,15 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
// Dispatch to worker thread
if (i != iend) {
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(hreq.release(), path, i->handler));
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
assert(workQueue);
if (workQueue->Enqueue(item.get()))
{
item.release(); /* if true, queue took ownership */
else
} else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");
}
} else {
hreq->WriteReply(HTTP_NOTFOUND);
}
@ -304,13 +303,14 @@ static void http_reject_request_cb(struct evhttp_request* req, void*)
}
/** Event dispatcher thread */
static void ThreadHTTP(struct event_base* base, struct evhttp* http)
static bool ThreadHTTP(struct event_base* base, struct evhttp* http)
{
RenameThread("zcash-http");
LogPrint("http", "Entering http event loop\n");
event_base_dispatch(base);
// Event loop will be interrupted by InterruptHTTPServer()
LogPrint("http", "Exited http event loop\n");
return event_base_got_break(base) == 0;
}
/** Bind HTTP server to specified addresses */
@ -418,6 +418,7 @@ bool InitHTTPServer()
}
evhttp_set_timeout(http, GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));
evhttp_set_max_headers_size(http, MAX_HEADERS_SIZE);
evhttp_set_max_body_size(http, MAX_SIZE);
evhttp_set_gencb(http, http_request_cb, NULL);
@ -438,17 +439,20 @@ bool InitHTTPServer()
return true;
}
boost::thread threadHTTP;
std::thread threadHTTP;
std::future<bool> threadResult;
bool StartHTTPServer()
{
LogPrint("http", "Starting HTTP server\n");
int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
threadHTTP = boost::thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP));
std::packaged_task<bool(event_base*, evhttp*)> task(ThreadHTTP);
threadResult = task.get_future();
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
for (int i = 0; i < rpcThreads; i++) {
boost::thread rpc_worker(HTTPWorkQueueRun, workQueue);
std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
}
return true;
@ -459,7 +463,7 @@ void InterruptHTTPServer()
LogPrint("http", "Interrupting HTTP server\n");
if (eventHTTP) {
// Unlisten sockets
BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) {
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
// Reject requests on current connections
@ -487,11 +491,11 @@ void StopHTTPServer()
// master that appears to be solved, so in the future that solution
// could be used again (if desirable).
// (see discussion in https://github.com/bitcoin/bitcoin/pull/6990)
if (!threadHTTP.try_join_for(boost::chrono::milliseconds(2000))) {
if (threadResult.valid() && threadResult.wait_for(std::chrono::milliseconds(2000)) == std::future_status::timeout) {
LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n");
event_base_loopbreak(eventBase);
threadHTTP.join();
}
threadHTTP.join();
}
if (eventHTTP) {
evhttp_free(eventHTTP);
@ -600,7 +604,7 @@ void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size());
HTTPEvent* ev = new HTTPEvent(eventBase, true,
boost::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL));
std::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL));
ev->trigger(0);
replySent = true;
req = 0; // transferred back to main thread
@ -665,4 +669,3 @@ void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch)
pathHandlers.erase(i);
}
}

View File

@ -7,8 +7,7 @@
#include <string>
#include <stdint.h>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <functional>
static const int DEFAULT_HTTP_THREADS=4;
static const int DEFAULT_HTTP_WORKQUEUE=16;

View File

@ -506,11 +506,14 @@ std::string HelpMessage(HelpMessageMode mode)
return strUsage;
}
static void BlockNotifyCallback(const uint256& hashNewTip)
static void BlockNotifyCallback(bool initialSync, const CBlockIndex *pBlockIndex)
{
if (initialSync || !pBlockIndex)
return;
std::string strCmd = GetArg("-blocknotify", "");
boost::replace_all(strCmd, "%s", hashNewTip.GetHex());
boost::replace_all(strCmd, "%s", pBlockIndex->GetBlockHash().GetHex());
boost::thread t(runCommand, strCmd); // thread runs free
}

View File

@ -3628,6 +3628,8 @@ bool ActivateBestChain(CValidationState& state, const CChainParams& chainparams,
// When we reach this point, we switched to a new tip (stored in pindexNewTip).
// Notifications/callbacks that can run without cs_main
// Always notify the UI if a new block tip was connected
uiInterface.NotifyBlockTip(fInitialDownload, pindexNewTip);
if (!fInitialDownload) {
uint256 hashNewTip = pindexNewTip->GetBlockHash();
// Relay inventory, but don't relay old inventory during initial block download.
@ -3642,7 +3644,6 @@ bool ActivateBestChain(CValidationState& state, const CChainParams& chainparams,
}
// Notify external listeners about the new tip.
GetMainSignals().UpdatedBlockTip(pindexNewTip);
uiInterface.NotifyBlockTip(hashNewTip);
}
} while(pindexMostWork != chainActive.Tip());
CheckBlockIndex(chainparams.GetConsensus());

View File

@ -733,7 +733,7 @@ void static BitcoinMiner(const CChainParams& chainparams)
std::mutex m_cs;
bool cancelSolver = false;
boost::signals2::connection c = uiInterface.NotifyBlockTip.connect(
[&m_cs, &cancelSolver](const uint256& hashNewTip) mutable {
[&m_cs, &cancelSolver](bool, const CBlockIndex *) mutable {
std::lock_guard<std::mutex> lock{m_cs};
cancelSolver = true;
}

View File

@ -15,6 +15,7 @@
class CBasicKeyStore;
class CWallet;
class uint256;
class CBlockIndex;
/** General change type (added, updated, removed). */
enum ChangeType
@ -97,7 +98,7 @@ public:
boost::signals2::signal<void (const std::string &title, int nProgress)> ShowProgress;
/** New block has been accepted */
boost::signals2::signal<void (const uint256& hash)> NotifyBlockTip;
boost::signals2::signal<void (bool, const CBlockIndex *)> NotifyBlockTip;
/** Transaction expired */
boost::signals2::signal<void (const uint256& txid)> NotifyTxExpiration;