From 032488e6e7b4c2d37f3f434d575d25806ddcb0b0 Mon Sep 17 00:00:00 2001 From: Patrick Strateman Date: Mon, 24 Sep 2018 17:03:17 -0400 Subject: [PATCH] Move SocketHandler logic to private method. --- src/net.cpp | 398 ++++++++++++++++++++++++++-------------------------- src/net.h | 1 + 2 files changed, 202 insertions(+), 197 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 644fed5ef..1b47f288f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1262,209 +1262,213 @@ void CConnman::InactivityCheck(CNode *pnode) } } +void CConnman::SocketHandler() +{ + // + // Find which sockets have data to receive + // + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 50000; // frequency to poll pnode->vSend + + fd_set fdsetRecv; + fd_set fdsetSend; + fd_set fdsetError; + FD_ZERO(&fdsetRecv); + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + SOCKET hSocketMax = 0; + bool have_fds = false; + + for (const ListenSocket& hListenSocket : vhListenSocket) { + FD_SET(hListenSocket.socket, &fdsetRecv); + hSocketMax = std::max(hSocketMax, hListenSocket.socket); + have_fds = true; + } + + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodes) + { + // Implement the following logic: + // * If there is data to send, select() for sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. + + bool select_recv = !pnode->fPauseRecv; + bool select_send; + { + LOCK(pnode->cs_vSend); + select_send = !pnode->vSendMsg.empty(); + } + + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = std::max(hSocketMax, pnode->hSocket); + have_fds = true; + + if (select_send) { + FD_SET(pnode->hSocket, &fdsetSend); + continue; + } + if (select_recv) { + FD_SET(pnode->hSocket, &fdsetRecv); + } + } + } + + int nSelect = select(have_fds ? hSocketMax + 1 : 0, + &fdsetRecv, &fdsetSend, &fdsetError, &timeout); + if (interruptNet) + return; + + if (nSelect == SOCKET_ERROR) + { + if (have_fds) + { + int nErr = WSAGetLastError(); + LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); + for (unsigned int i = 0; i <= hSocketMax; i++) + FD_SET(i, &fdsetRecv); + } + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; + } + + // + // Accept new connections + // + for (const ListenSocket& hListenSocket : vhListenSocket) + { + if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) + { + AcceptConnection(hListenSocket); + } + } + + // + // Service each socket + // + std::vector vNodesCopy; + { + LOCK(cs_vNodes); + vNodesCopy = vNodes; + for (CNode* pnode : vNodesCopy) + pnode->AddRef(); + } + for (CNode* pnode : vNodesCopy) + { + if (interruptNet) + return; + + // + // Receive + // + bool recvSet = false; + bool sendSet = false; + bool errorSet = false; + { + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); + sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); + errorSet = FD_ISSET(pnode->hSocket, &fdsetError); + } + if (recvSet || errorSet) + { + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int nBytes = 0; + { + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; + nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); + } + if (nBytes > 0) + { + bool notify = false; + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) + pnode->CloseSocketDisconnect(); + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; + } + WakeMessageHandler(); + } + } + else if (nBytes == 0) + { + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); + } + pnode->CloseSocketDisconnect(); + } + else if (nBytes < 0) + { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + if (!pnode->fDisconnect) + LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); + pnode->CloseSocketDisconnect(); + } + } + } + + // + // Send + // + if (sendSet) + { + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); + } + } + + InactivityCheck(pnode); + } + { + LOCK(cs_vNodes); + for (CNode* pnode : vNodesCopy) + pnode->Release(); + } +} + void CConnman::ThreadSocketHandler() { while (!interruptNet) { DisconnectNodes(); NotifyNumConnectionsChanged(); - - // - // Find which sockets have data to receive - // - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 50000; // frequency to poll pnode->vSend - - fd_set fdsetRecv; - fd_set fdsetSend; - fd_set fdsetError; - FD_ZERO(&fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - SOCKET hSocketMax = 0; - bool have_fds = false; - - for (const ListenSocket& hListenSocket : vhListenSocket) { - FD_SET(hListenSocket.socket, &fdsetRecv); - hSocketMax = std::max(hSocketMax, hListenSocket.socket); - have_fds = true; - } - - { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) - { - // Implement the following logic: - // * If there is data to send, select() for sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, select() for - // receiving data. - // * Hand off all complete messages to the processor, to be handled without - // blocking here. - - bool select_recv = !pnode->fPauseRecv; - bool select_send; - { - LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); - } - - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = std::max(hSocketMax, pnode->hSocket); - have_fds = true; - - if (select_send) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } - if (select_recv) { - FD_SET(pnode->hSocket, &fdsetRecv); - } - } - } - - int nSelect = select(have_fds ? hSocketMax + 1 : 0, - &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - if (interruptNet) - return; - - if (nSelect == SOCKET_ERROR) - { - if (have_fds) - { - int nErr = WSAGetLastError(); - LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); - for (unsigned int i = 0; i <= hSocketMax; i++) - FD_SET(i, &fdsetRecv); - } - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) - return; - } - - // - // Accept new connections - // - for (const ListenSocket& hListenSocket : vhListenSocket) - { - if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) - { - AcceptConnection(hListenSocket); - } - } - - // - // Service each socket - // - std::vector vNodesCopy; - { - LOCK(cs_vNodes); - vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) - pnode->AddRef(); - } - for (CNode* pnode : vNodesCopy) - { - if (interruptNet) - return; - - // - // Receive - // - bool recvSet = false; - bool sendSet = false; - bool errorSet = false; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); - sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); - errorSet = FD_ISSET(pnode->hSocket, &fdsetError); - } - if (recvSet || errorSet) - { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int nBytes = 0; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - } - if (nBytes > 0) - { - bool notify = false; - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) - pnode->CloseSocketDisconnect(); - RecordBytesRecv(nBytes); - if (notify) { - size_t nSizeAdded = 0; - auto it(pnode->vRecvMsg.begin()); - for (; it != pnode->vRecvMsg.end(); ++it) { - if (!it->complete()) - break; - nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } - WakeMessageHandler(); - } - } - else if (nBytes == 0) - { - // socket closed gracefully - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "socket closed\n"); - } - pnode->CloseSocketDisconnect(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - if (!pnode->fDisconnect) - LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); - } - } - } - - // - // Send - // - if (sendSet) - { - LOCK(pnode->cs_vSend); - size_t nBytes = SocketSendData(pnode); - if (nBytes) { - RecordBytesSent(nBytes); - } - } - - InactivityCheck(pnode); - } - { - LOCK(cs_vNodes); - for (CNode* pnode : vNodesCopy) - pnode->Release(); - } + SocketHandler(); } } diff --git a/src/net.h b/src/net.h index 480e9b83f..03be59131 100644 --- a/src/net.h +++ b/src/net.h @@ -339,6 +339,7 @@ private: void DisconnectNodes(); void NotifyNumConnectionsChanged(); void InactivityCheck(CNode *pnode); + void SocketHandler(); void ThreadSocketHandler(); void ThreadDNSAddressSeed();