From 7325b1556684158461c9c0df056c9d071444b54e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:11:57 -0500 Subject: [PATCH 1/7] net: a few small cleanups before replacing boost threads - Drop the interruption point directly after the pnode allocation. This would be leaky if hit. - Rearrange thread creation so that the socket handler comes first --- src/net.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 3ac962354..9c58577f1 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1806,7 +1806,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return false; CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure); - boost::this_thread::interruption_point(); if (!pnode) return false; @@ -2146,14 +2145,14 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st // Start threads // + // Send and receive from sockets, accept connections + threadGroup.create_thread(boost::bind(&TraceThread >, "net", boost::function(boost::bind(&CConnman::ThreadSocketHandler, this)))); + if (!GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else threadGroup.create_thread(boost::bind(&TraceThread >, "dnsseed", boost::function(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); - // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread >, "net", boost::function(boost::bind(&CConnman::ThreadSocketHandler, this)))); - // Initiate outbound connections from -addnode threadGroup.create_thread(boost::bind(&TraceThread >, "addcon", boost::function(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); From 799df9115f262fbc25c2c2737ccd8a4e1b20e5b0 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:12:15 -0500 Subject: [PATCH 2/7] net: add CThreadInterrupt and InterruptibleSleep --- src/Makefile.am | 2 ++ src/threadinterrupt.cpp | 41 +++++++++++++++++++++++++++++++++++++++++ src/threadinterrupt.h | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 src/threadinterrupt.cpp create mode 100644 src/threadinterrupt.h diff --git a/src/Makefile.am b/src/Makefile.am index 89b90e6df..3428d4613 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -138,6 +138,7 @@ BITCOIN_CORE_H = \ support/lockedpool.h \ sync.h \ threadsafety.h \ + threadinterrupt.h \ timedata.h \ torcontrol.h \ txdb.h \ @@ -327,6 +328,7 @@ libbitcoin_util_a_SOURCES = \ rpc/protocol.cpp \ support/cleanse.cpp \ sync.cpp \ + threadinterrupt.cpp \ util.cpp \ utilmoneystr.cpp \ utilstrencodings.cpp \ diff --git a/src/threadinterrupt.cpp b/src/threadinterrupt.cpp new file mode 100644 index 000000000..9d691079e --- /dev/null +++ b/src/threadinterrupt.cpp @@ -0,0 +1,41 @@ +// Copyright (c) 2009-2010 Satoshi Nakamoto +// Copyright (c) 2009-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "threadinterrupt.h" + +CThreadInterrupt::operator bool() const +{ + return flag.load(std::memory_order_acquire); +} + +void CThreadInterrupt::reset() +{ + flag.store(false, std::memory_order_release); +} + +void CThreadInterrupt::operator()() +{ + { + std::unique_lock lock(mut); + flag.store(true, std::memory_order_release); + } + cond.notify_all(); +} + +bool CThreadInterrupt::sleep_for(std::chrono::milliseconds rel_time) +{ + std::unique_lock lock(mut); + return !cond.wait_for(lock, rel_time, [this]() { return flag.load(std::memory_order_acquire); }); +} + +bool CThreadInterrupt::sleep_for(std::chrono::seconds rel_time) +{ + return sleep_for(std::chrono::duration_cast(rel_time)); +} + +bool CThreadInterrupt::sleep_for(std::chrono::minutes rel_time) +{ + return sleep_for(std::chrono::duration_cast(rel_time)); +} diff --git a/src/threadinterrupt.h b/src/threadinterrupt.h new file mode 100644 index 000000000..54e310280 --- /dev/null +++ b/src/threadinterrupt.h @@ -0,0 +1,34 @@ +// Copyright (c) 2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_THREADINTERRUPT_H +#define BITCOIN_THREADINTERRUPT_H + +#include +#include +#include +#include + +/* + A helper class for interruptible sleeps. Calling operator() will interrupt + any current sleep, and after that point operator bool() will return true + until reset. +*/ +class CThreadInterrupt +{ +public: + explicit operator bool() const; + void operator()(); + void reset(); + bool sleep_for(std::chrono::milliseconds rel_time); + bool sleep_for(std::chrono::seconds rel_time); + bool sleep_for(std::chrono::minutes rel_time); + +private: + std::condition_variable cond; + std::mutex mut; + std::atomic flag; +}; + +#endif //BITCOIN_THREADINTERRUPT_H From 0985052319263bd7ca9744af3504682b3ea8e21a Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:12:44 -0500 Subject: [PATCH 3/7] net: make net interruptible Also now that net threads are interruptible, switch them to use std threads/binds/mutexes/condvars. --- src/init.cpp | 4 +- src/net.cpp | 105 ++++++++++++++++++++++++++++++++++----------------- src/net.h | 19 +++++++++- 3 files changed, 91 insertions(+), 37 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 7d2bcb57b..7a493cc19 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup) InterruptRPC(); InterruptREST(); InterruptTorControl(); + if (g_connman) + g_connman->Interrupt(); threadGroup.interrupt_all(); } @@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe; connOptions.nMaxOutboundLimit = nMaxOutboundLimit; - if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions)) + if (!connman.Start(scheduler, strNodeError, connOptions)) return InitError(strNodeError); // ********************************************************* Step 12: finished diff --git a/src/net.cpp b/src/net.cpp index 9c58577f1..a66679cd8 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1042,7 +1042,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { void CConnman::ThreadSocketHandler() { unsigned int nPrevNodeCount = 0; - while (true) + while (!interruptNet) { // // Disconnect nodes @@ -1180,7 +1180,8 @@ void CConnman::ThreadSocketHandler() int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - boost::this_thread::interruption_point(); + if (interruptNet) + return; if (nSelect == SOCKET_ERROR) { @@ -1193,7 +1194,8 @@ void CConnman::ThreadSocketHandler() } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - MilliSleep(timeout.tv_usec/1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; } // @@ -1219,7 +1221,8 @@ void CConnman::ThreadSocketHandler() } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - boost::this_thread::interruption_point(); + if (interruptNet) + return; // // Receive @@ -1241,7 +1244,7 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); if(notify) - messageHandlerCondition.notify_one(); + condMsgProc.notify_one(); pnode->nLastRecv = GetTime(); pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); @@ -1469,7 +1472,8 @@ void CConnman::ThreadDNSAddressSeed() // less influence on the network topology, and reduces traffic to the seeds. if ((addrman.size() > 0) && (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { - MilliSleep(11 * 1000); + if (!interruptNet.sleep_for(std::chrono::seconds(11))) + return; LOCK(cs_vNodes); int nRelevant = 0; @@ -1580,10 +1584,12 @@ void CConnman::ThreadOpenConnections() OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } @@ -1592,14 +1598,16 @@ void CConnman::ThreadOpenConnections() // Minimum time before next feeler connection (in microseconds). int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); - while (true) + while (!interruptNet) { ProcessOneShot(); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; CSemaphoreGrant grant(*semOutbound); - boost::this_thread::interruption_point(); + if (interruptNet) + return; // Add seed nodes if DNS seeds are all down (an infrastructure attack?). if (addrman.size() == 0 && (GetTime() - nStart > 60)) { @@ -1657,7 +1665,7 @@ void CConnman::ThreadOpenConnections() int64_t nANow = GetAdjustedTime(); int nTries = 0; - while (true) + while (!interruptNet) { CAddrInfo addr = addrman.Select(fFeeler); @@ -1700,7 +1708,8 @@ void CConnman::ThreadOpenConnections() if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); - MilliSleep(randsleep); + if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep))) + return; LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); } @@ -1779,11 +1788,12 @@ void CConnman::ThreadOpenAddedConnections() // OpenNetworkConnection can detect existing connections to that IP/port. CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - - MilliSleep(120000); // Retry every 2 minutes + if (!interruptNet.sleep_for(std::chrono::minutes(2))) + return; } } @@ -1793,7 +1803,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai // // Initiate outbound network connection // - boost::this_thread::interruption_point(); + if (interruptNet) { + return false; + } if (!fNetworkActive) { return false; } @@ -1819,13 +1831,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return true; } - void CConnman::ThreadMessageHandler() { - boost::mutex condition_mutex; - boost::unique_lock lock(condition_mutex); - - while (true) + while (!flagInterruptMsgProc) { std::vector vNodesCopy; { @@ -1860,7 +1868,8 @@ void CConnman::ThreadMessageHandler() } } } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; // Send messages { @@ -1868,7 +1877,8 @@ void CConnman::ThreadMessageHandler() if (lockSend) GetNodeSignals().SendMessages(pnode, *this); } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; } { @@ -1877,8 +1887,10 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) - messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); + if (fSleep) { + std::unique_lock lock(mutexMsgProc); + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + } } } @@ -2070,6 +2082,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe nMaxOutbound = 0; nBestHeight = 0; clientInterface = NULL; + flagInterruptMsgProc = false; } NodeId CConnman::GetNewNodeId() @@ -2077,7 +2090,7 @@ NodeId CConnman::GetNewNodeId() return nLastNodeId.fetch_add(1, std::memory_order_relaxed); } -bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) +bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions) { nTotalBytesRecv = 0; nTotalBytesSent = 0; @@ -2144,24 +2157,26 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st // // Start threads // + interruptNet.reset(); + flagInterruptMsgProc = false; // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread >, "net", boost::function(boost::bind(&CConnman::ThreadSocketHandler, this)))); + threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); if (!GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadGroup.create_thread(boost::bind(&TraceThread >, "dnsseed", boost::function(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); + threadDNSAddressSeed = std::thread(&TraceThread >, "dnsseed", std::function(std::bind(&CConnman::ThreadDNSAddressSeed, this))); // Initiate outbound connections from -addnode - threadGroup.create_thread(boost::bind(&TraceThread >, "addcon", boost::function(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); + threadOpenAddedConnections = std::thread(&TraceThread >, "addcon", std::function(std::bind(&CConnman::ThreadOpenAddedConnections, this))); // Initiate outbound connections unless connect=0 if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") - threadGroup.create_thread(boost::bind(&TraceThread >, "opencon", boost::function(boost::bind(&CConnman::ThreadOpenConnections, this)))); + threadOpenConnections = std::thread(&TraceThread >, "opencon", std::function(std::bind(&CConnman::ThreadOpenConnections, this))); // Process messages - threadGroup.create_thread(boost::bind(&TraceThread >, "msghand", boost::function(boost::bind(&CConnman::ThreadMessageHandler, this)))); + threadMessageHandler = std::thread(&TraceThread >, "msghand", std::function(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); @@ -2184,12 +2199,33 @@ public: } instance_of_cnetcleanup; -void CConnman::Stop() +void CConnman::Interrupt() { - LogPrintf("%s\n",__func__); + { + std::lock_guard lock(mutexMsgProc); + flagInterruptMsgProc = true; + } + condMsgProc.notify_all(); + + interruptNet(); + if (semOutbound) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) semOutbound->post(); +} + +void CConnman::Stop() +{ + if (threadMessageHandler.joinable()) + threadMessageHandler.join(); + if (threadOpenConnections.joinable()) + threadOpenConnections.join(); + if (threadOpenAddedConnections.joinable()) + threadOpenAddedConnections.join(); + if (threadDNSAddressSeed.joinable()) + threadDNSAddressSeed.join(); + if (threadSocketHandler.joinable()) + threadSocketHandler.join(); if (fAddressesInitialized) { @@ -2232,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode) CConnman::~CConnman() { + Interrupt(); Stop(); } diff --git a/src/net.h b/src/net.h index a7c0ecf32..b26f28326 100644 --- a/src/net.h +++ b/src/net.h @@ -19,11 +19,14 @@ #include "streams.h" #include "sync.h" #include "uint256.h" +#include "threadinterrupt.h" #include #include #include +#include #include +#include #ifndef WIN32 #include @@ -142,8 +145,9 @@ public: }; CConnman(uint64_t seed0, uint64_t seed1); ~CConnman(); - bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options); + bool Start(CScheduler& scheduler, std::string& strNodeError, Options options); void Stop(); + void Interrupt(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool GetNetworkActive() const { return fNetworkActive; }; void SetNetworkActive(bool active); @@ -402,7 +406,6 @@ private: std::list vNodesDisconnected; mutable CCriticalSection cs_vNodes; std::atomic nLastNodeId; - boost::condition_variable messageHandlerCondition; /** Services this instance offers */ ServiceFlags nLocalServices; @@ -419,6 +422,18 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + + std::condition_variable condMsgProc; + std::mutex mutexMsgProc; + std::atomic flagInterruptMsgProc; + + CThreadInterrupt interruptNet; + + std::thread threadDNSAddressSeed; + std::thread threadSocketHandler; + std::thread threadOpenAddedConnections; + std::thread threadOpenConnections; + std::thread threadMessageHandler; }; extern std::unique_ptr g_connman; void Discover(boost::thread_group& threadGroup); From d3d7056d2a562301b3770c4ede1dfc8ffb00cf4b Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:13:04 -0500 Subject: [PATCH 4/7] net: make net processing interruptible --- src/net.cpp | 4 ++-- src/net.h | 4 ++-- src/net_processing.cpp | 33 +++++++++++++++++++-------------- src/net_processing.h | 5 +++-- src/test/DoS_tests.cpp | 20 +++++++++++++------- 5 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index a66679cd8..bbfada430 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1856,7 +1856,7 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this)) + if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) pnode->CloseSocketDisconnect(); if (pnode->nSendSize < GetSendBufferSize()) @@ -1875,7 +1875,7 @@ void CConnman::ThreadMessageHandler() { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - GetNodeSignals().SendMessages(pnode, *this); + GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); } if (flagInterruptMsgProc) return; diff --git a/src/net.h b/src/net.h index b26f28326..89c08da86 100644 --- a/src/net.h +++ b/src/net.h @@ -460,8 +460,8 @@ struct CombinerAll // Signals for message handling struct CNodeSignals { - boost::signals2::signal ProcessMessages; - boost::signals2::signal SendMessages; + boost::signals2::signal&), CombinerAll> ProcessMessages; + boost::signals2::signal&), CombinerAll> SendMessages; boost::signals2::signal InitializeNode; boost::signals2::signal FinalizeNode; }; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 380decbcb..f3a04080d 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman) +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic& interruptMsgProc) { std::deque::iterator it = pfrom->vRecvGetData.begin(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam const CInv &inv = *it; { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return; + it++; if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) @@ -1055,7 +1057,7 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params return nFetchFlags; } -bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman) +bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic& interruptMsgProc) { unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nSince = nNow - 10 * 60; BOOST_FOREACH(CAddress& addr, vAddr) { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) continue; @@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, { CInv &inv = vInv[nInv]; - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; bool fAlreadyHave = AlreadyHave(inv); LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id); @@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); } @@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash; pfrom->vRecvGetData.push_back(inv); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); return true; } @@ -1925,10 +1929,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // cs_main if (fProcessBLOCKTXN) - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman); + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fRevertToHeaderProcessing) - return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); + return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fBlockReconstructed) { // If we got here, we were able to optimistically reconstruct a @@ -2441,7 +2445,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // requires LOCK(cs_vRecvMsg) -bool ProcessMessages(CNode* pfrom, CConnman& connman) +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -2459,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fOk = true; if (!pfrom->vRecvGetData.empty()) - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; @@ -2520,8 +2524,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fRet = false; try { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman); - boost::this_thread::interruption_point(); + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); + if (interruptMsgProc) + return true; } catch (const std::ios_base::failure& e) { @@ -2585,7 +2590,7 @@ public: } }; -bool SendMessages(CNode* pto, CConnman& connman) +bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { diff --git a/src/net_processing.h b/src/net_processing.h index 130433cc7..9e76cad50 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -39,13 +39,14 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); void Misbehaving(NodeId nodeid, int howmuch); /** Process protocol messages received from a given node */ -bool ProcessMessages(CNode* pfrom, CConnman& connman); +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interrupt); /** * Send queued protocol messages to be sent to a give node. * * @param[in] pto The node which we are sending messages to. * @param[in] connman The connection manager for that node. + * @param[in] interrupt Interrupt condition for processing threads */ -bool SendMessages(CNode* pto, CConnman& connman); +bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interrupt); #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index d90dcaeb0..35edf60a2 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -47,6 +47,8 @@ BOOST_FIXTURE_TEST_SUITE(DoS_tests, TestingSetup) BOOST_AUTO_TEST_CASE(DoS_banning) { + std::atomic interruptDummy(false); + connman->ClearBanned(); CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true); @@ -54,7 +56,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning) GetNodeSignals().InitializeNode(&dummyNode1, *connman); dummyNode1.nVersion = 1; Misbehaving(dummyNode1.GetId(), 100); // Should get banned - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned @@ -64,16 +66,18 @@ BOOST_AUTO_TEST_CASE(DoS_banning) GetNodeSignals().InitializeNode(&dummyNode2, *connman); dummyNode2.nVersion = 1; Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman); + SendMessages(&dummyNode2, *connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman); + SendMessages(&dummyNode2, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr2)); } BOOST_AUTO_TEST_CASE(DoS_banscore) { + std::atomic interruptDummy(false); + connman->ClearBanned(); ForceSetArg("-banscore", "111"); // because 11 is my favorite number CAddress addr1(ip(0xa0b0c001), NODE_NONE); @@ -82,19 +86,21 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) GetNodeSignals().InitializeNode(&dummyNode1, *connman); dummyNode1.nVersion = 1; Misbehaving(dummyNode1.GetId(), 100); - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 10); - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 1); - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); } BOOST_AUTO_TEST_CASE(DoS_bantime) { + std::atomic interruptDummy(false); + connman->ClearBanned(); int64_t nStartTime = GetTime(); SetMockTime(nStartTime); // Overrides future calls to GetTime() @@ -106,7 +112,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) dummyNode.nVersion = 1; Misbehaving(dummyNode.GetId(), 100); - SendMessages(&dummyNode, *connman); + SendMessages(&dummyNode, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr)); SetMockTime(nStartTime+60*60); From 5cb0fcee8137d6de8d2b9525aa45fd11ab2231c8 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:13:19 -0500 Subject: [PATCH 5/7] net: remove thread_interrupted catch This is now a std::thread, so there's no hope of catching a boost interruption point. --- src/net_processing.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f3a04080d..e0c12d853 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2551,9 +2551,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru PrintExceptionContinue(&e, "ProcessMessages()"); } } - catch (const boost::thread_interrupted&) { - throw; - } catch (const std::exception& e) { PrintExceptionContinue(&e, "ProcessMessages()"); } catch (...) { From 8b3159ef0a912da67c545a3d24f4558f8df866e4 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:13:31 -0500 Subject: [PATCH 6/7] net: make proxy receives interruptible --- src/net.cpp | 2 ++ src/netbase.cpp | 12 ++++++++++-- src/netbase.h | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index bbfada430..7e16e2382 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2157,6 +2157,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c // // Start threads // + InterruptSocks5(false); interruptNet.reset(); flagInterruptMsgProc = false; @@ -2208,6 +2209,7 @@ void CConnman::Interrupt() condMsgProc.notify_all(); interruptNet(); + InterruptSocks5(true); if (semOutbound) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) diff --git a/src/netbase.cpp b/src/netbase.cpp index da94fd4d1..87c7abd7c 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -19,6 +19,7 @@ #ifdef HAVE_GETADDRINFO_A #include #endif +#include #ifndef WIN32 #if HAVE_INET_PTON @@ -44,6 +45,7 @@ bool fNameLookup = DEFAULT_NAME_LOOKUP; // Need ample time for negotiation for very slow proxies such as Tor (milliseconds) static const int SOCKS5_RECV_TIMEOUT = 20 * 1000; +static std::atomic interruptSocks5Recv(false); enum Network ParseNetwork(std::string net) { boost::to_lower(net); @@ -206,7 +208,7 @@ struct timeval MillisToTimeval(int64_t nTimeout) /** * Read bytes from socket. This will either read the full number of bytes requested * or return False on error or timeout. - * This function can be interrupted by boost thread interrupt. + * This function can be interrupted by calling InterruptSocks5() * * @param data Buffer to receive into * @param len Length of data to receive @@ -246,7 +248,8 @@ bool static InterruptibleRecv(char* data, size_t len, int timeout, SOCKET& hSock return false; } } - boost::this_thread::interruption_point(); + if (interruptSocks5Recv) + return false; curTime = GetTimeMillis(); } return len == 0; @@ -715,3 +718,8 @@ bool SetSocketNonBlocking(SOCKET& hSocket, bool fNonBlocking) return true; } + +void InterruptSocks5(bool interrupt) +{ + interruptSocks5Recv = interrupt; +} diff --git a/src/netbase.h b/src/netbase.h index eb39d1657..9ff8102bd 100644 --- a/src/netbase.h +++ b/src/netbase.h @@ -63,5 +63,6 @@ bool SetSocketNonBlocking(SOCKET& hSocket, bool fNonBlocking); * Convert milliseconds to a struct timeval for e.g. select. */ struct timeval MillisToTimeval(int64_t nTimeout); +void InterruptSocks5(bool interrupt); #endif // BITCOIN_NETBASE_H From 67ee4ec9015592c8447955356adfcbb1bf473e32 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Tue, 27 Dec 2016 17:13:51 -0500 Subject: [PATCH 7/7] net: misc header cleanups --- src/net.cpp | 2 -- src/netbase.cpp | 7 ------- 2 files changed, 9 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 7e16e2382..66c853915 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -35,8 +35,6 @@ #include #endif -#include -#include #include diff --git a/src/netbase.cpp b/src/netbase.cpp index 87c7abd7c..afa3c3b13 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -16,21 +16,14 @@ #include "util.h" #include "utilstrencodings.h" -#ifdef HAVE_GETADDRINFO_A -#include -#endif #include #ifndef WIN32 -#if HAVE_INET_PTON -#include -#endif #include #endif #include // for to_lower() #include // for startswith() and endswith() -#include #if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL) #define MSG_NOSIGNAL 0