net: move messageHandlerCondition to CConnman

This commit is contained in:
Cory Fields 2016-04-18 21:33:54 -04:00
parent 960cf2e405
commit ee44fa9576
2 changed files with 9 additions and 5 deletions

View File

@ -89,7 +89,6 @@ std::string strSubVersion;
limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ); limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);
static CSemaphore *semOutbound = NULL; static CSemaphore *semOutbound = NULL;
boost::condition_variable messageHandlerCondition;
// Signals for message handling // Signals for message handling
static CNodeSignals g_signals; static CNodeSignals g_signals;
@ -688,8 +687,9 @@ void CNode::copyStats(CNodeStats &stats)
#undef X #undef X
// requires LOCK(cs_vRecvMsg) // requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{ {
complete = false;
while (nBytes > 0) { while (nBytes > 0) {
// get current incomplete message, or create a new one // get current incomplete message, or create a new one
@ -728,7 +728,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
msg.nTime = GetTimeMicros(); msg.nTime = GetTimeMicros();
messageHandlerCondition.notify_one(); complete = true;
} }
} }
@ -1247,8 +1247,11 @@ void CConnman::ThreadSocketHandler()
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
if (nBytes > 0) if (nBytes > 0)
{ {
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
if(notify)
messageHandlerCondition.notify_one();
pnode->nLastRecv = GetTime(); pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes; pnode->nRecvBytes += nBytes;
pnode->RecordBytesRecv(nBytes); pnode->RecordBytesRecv(nBytes);

View File

@ -228,6 +228,7 @@ private:
std::vector<CNode*> vNodes; std::vector<CNode*> vNodes;
mutable CCriticalSection cs_vNodes; mutable CCriticalSection cs_vNodes;
std::atomic<NodeId> nLastNodeId; std::atomic<NodeId> nLastNodeId;
boost::condition_variable messageHandlerCondition;
}; };
extern std::unique_ptr<CConnman> g_connman; extern std::unique_ptr<CConnman> g_connman;
void MapPort(bool fUseUPnP); void MapPort(bool fUseUPnP);
@ -550,7 +551,7 @@ public:
} }
// requires LOCK(cs_vRecvMsg) // requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes); bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
// requires LOCK(cs_vRecvMsg) // requires LOCK(cs_vRecvMsg)
void SetRecvVersion(int nVersionIn) void SetRecvVersion(int nVersionIn)