From d25cd3ec4e8961c5f36c29a65395f52d0db294c5 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 14 Apr 2016 17:45:49 -0700 Subject: [PATCH] Add receiver-side protocol implementation for CMPCTBLOCK stuff --- src/main.cpp | 215 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 207 insertions(+), 8 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 911c0a648..26b215f94 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,6 +7,7 @@ #include "addrman.h" #include "arith_uint256.h" +#include "blockencodings.h" #include "chainparams.h" #include "checkpoints.h" #include "checkqueue.h" @@ -197,8 +198,9 @@ namespace { /** Blocks that are in flight, and that are in the queue to be downloaded. Protected by cs_main. */ struct QueuedBlock { uint256 hash; - CBlockIndex* pindex; //!< Optional. - bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request. + CBlockIndex* pindex; //!< Optional. + bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request. + std::unique_ptr partialBlock; //!< Optional, used for CMPCTBLOCK downloads }; map::iterator> > mapBlocksInFlight; @@ -364,6 +366,7 @@ void FinalizeNode(NodeId nodeid) { // Requires cs_main. // Returns a bool indicating whether we requested this block. +// Also used if a block was /not/ received and timed out or started with another peer bool MarkBlockAsReceived(const uint256& hash) { map::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); if (itInFlight != mapBlocksInFlight.end()) { @@ -387,17 +390,26 @@ bool MarkBlockAsReceived(const uint256& hash) { } // Requires cs_main. -void MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL) { +// returns false, still setting pit, if the block was already in flight from the same peer +// pit will only be valid as long as the same cs_main lock is being held +bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL, list::iterator **pit = NULL) { CNodeState *state = State(nodeid); assert(state != NULL); + // Short-circuit most stuff in case its from the same node + map::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); + if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) { + *pit = &itInFlight->second.second; + return false; + } + // Make sure it's not listed somewhere already. MarkBlockAsReceived(hash); - QueuedBlock newentry = {hash, pindex, pindex != NULL}; - list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry); + list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), + {hash, pindex, pindex != NULL, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&mempool) : NULL)}); state->nBlocksInFlight++; - state->nBlocksInFlightValidHeaders += newentry.fValidatedHeaders; + state->nBlocksInFlightValidHeaders += it->fValidatedHeaders; if (state->nBlocksInFlight == 1) { // We're starting a block download (batch) from this peer. state->nDownloadingSince = GetTimeMicros(); @@ -405,7 +417,10 @@ void MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Pa if (state->nBlocksInFlightValidHeaders == 1 && pindex != NULL) { nPeersWithValidatedDownloads++; } - mapBlocksInFlight[hash] = std::make_pair(nodeid, it); + itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first; + if (pit) + *pit = &itInFlight->second.second; + return true; } /** Check whether the last unknown block a peer advertised is not yet known. */ @@ -4783,6 +4798,16 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // nodes) pfrom->PushMessage(NetMsgType::SENDHEADERS); } + if (pfrom->nVersion >= SHORT_IDS_BLOCKS_VERSION) { + // Tell our peer we are willing to provide version-1 cmpctblocks + // However, we do not request new block announcements using + // cmpctblock messages. + // We send this to non-NODE NETWORK peers as well, because + // they may wish to request compact blocks from us + bool fAnnounceUsingCMPCTBLOCK = false; + uint64_t nCMPCTBLOCKVersion = 1; + pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion); + } } @@ -4915,7 +4940,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, CNodeState *nodestate = State(pfrom->GetId()); if (CanDirectFetch(chainparams.GetConsensus()) && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { - vToFetch.push_back(inv); + if (nodestate->fProvidesHeaderAndIDs) + vToFetch.push_back(CInv(MSG_CMPCT_BLOCK, inv.hash)); + else + vToFetch.push_back(inv); // Mark block as in flight already, even though the actual "getdata" message only goes out // later (within the same cs_main lock, though). MarkBlockAsInFlight(pfrom->GetId(), inv.hash, chainparams.GetConsensus()); @@ -5232,6 +5260,174 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } + else if (strCommand == NetMsgType::CMPCTBLOCK && !fImporting && !fReindex) // Ignore blocks received while importing + { + CBlockHeaderAndShortTxIDs cmpctblock; + vRecv >> cmpctblock; + + LOCK(cs_main); + + if (mapBlockIndex.find(cmpctblock.header.hashPrevBlock) == mapBlockIndex.end()) { + // Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers + if (!IsInitialBlockDownload()) + pfrom->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256()); + return true; + } + + CBlockIndex *pindex = NULL; + CValidationState state; + if (!AcceptBlockHeader(cmpctblock.header, state, chainparams, &pindex)) { + int nDoS; + if (state.IsInvalid(nDoS)) { + if (nDoS > 0) + Misbehaving(pfrom->GetId(), nDoS); + LogPrintf("Peer %d sent us invalid header via cmpctblock\n", pfrom->id); + return true; + } + } + + // If AcceptBlockHeader returned true, it set pindex + assert(pindex); + UpdateBlockAvailability(pfrom->GetId(), pindex->GetBlockHash()); + + std::map::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash()); + bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end(); + + if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here + return true; + + if (pindex->nChainWork <= chainActive.Tip()->nChainWork || // We know something better + pindex->nTx != 0) { // We had this block at some point, but pruned it + if (fAlreadyInFlight) { + // We requested this block for some reason, but our mempool will probably be useless + // so we just grab the block via normal getdata + std::vector vInv(1); + vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); + pfrom->PushMessage(NetMsgType::GETDATA, vInv); + return true; + } + } + + // If we're not close to tip yet, give up and let parallel block fetch work its magic + if (!fAlreadyInFlight && !CanDirectFetch(chainparams.GetConsensus())) + return true; + + CNodeState *nodestate = State(pfrom->GetId()); + + // We want to be a bit conservative just to be extra careful about DoS + // possibilities in compact block processing... + if (pindex->nHeight <= chainActive.Height() + 2) { + if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || + (fAlreadyInFlight && blockInFlightIt->second.first == pfrom->GetId())) { + list::iterator *queuedBlockIt = NULL; + if (!MarkBlockAsInFlight(pfrom->GetId(), pindex->GetBlockHash(), chainparams.GetConsensus(), pindex, &queuedBlockIt)) { + if (!(*queuedBlockIt)->partialBlock) + (*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&mempool)); + else { + // The block was already in flight using compact blocks from the same peer + LogPrint("net", "Peer sent us compact block we were already syncing!\n"); + return true; + } + } + + PartiallyDownloadedBlock& partialBlock = *(*queuedBlockIt)->partialBlock; + ReadStatus status = partialBlock.InitData(cmpctblock); + if (status == READ_STATUS_INVALID) { + MarkBlockAsReceived(pindex->GetBlockHash()); // Reset in-flight state in case of whitelist + Misbehaving(pfrom->GetId(), 100); + LogPrintf("Peer %d sent us invalid compact block\n", pfrom->id); + return true; + } else if (status == READ_STATUS_FAILED) { + // Duplicate txindexes, the block is now in-flight, so just request it + std::vector vInv(1); + vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); + pfrom->PushMessage(NetMsgType::GETDATA, vInv); + return true; + } + + BlockTransactionsRequest req; + for (size_t i = 0; i < cmpctblock.BlockTxCount(); i++) { + if (!partialBlock.IsTxAvailable(i)) + req.indexes.push_back(i); + } + if (req.indexes.empty()) { + // Dirty hack to jump to BLOCKTXN code (TODO: move message handling into their own functions) + BlockTransactions txn; + txn.blockhash = cmpctblock.header.GetHash(); + CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION); + blockTxnMsg << txn; + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams); + } else { + req.blockhash = pindex->GetBlockHash(); + pfrom->PushMessage(NetMsgType::GETBLOCKTXN, req); + } + } + } else { + if (fAlreadyInFlight) { + // We requested this block, but its far into the future, so our + // mempool will probably be useless - request the block normally + std::vector vInv(1); + vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); + pfrom->PushMessage(NetMsgType::GETDATA, vInv); + return true; + } else { + // If this was an announce-cmpctblock, we want the same treatment as a header message + // Dirty hack to process as if it were just a headers message (TODO: move message handling into their own functions) + std::vector headers; + headers.push_back(cmpctblock.header); + CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION); + vHeadersMsg << headers; + return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams); + } + } + + CheckBlockIndex(chainparams.GetConsensus()); + } + + else if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing + { + BlockTransactions resp; + vRecv >> resp; + + LOCK(cs_main); + + map::iterator> >::iterator it = mapBlocksInFlight.find(resp.blockhash); + if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock || + it->second.first != pfrom->GetId()) { + LogPrint("net", "Peer %d sent us block transactions for block we weren't expecting\n", pfrom->id); + return true; + } + + PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock; + CBlock block; + ReadStatus status = partialBlock.FillBlock(block, resp.txn); + if (status == READ_STATUS_INVALID) { + MarkBlockAsReceived(resp.blockhash); // Reset in-flight state in case of whitelist + Misbehaving(pfrom->GetId(), 100); + LogPrintf("Peer %d sent us invalid compact block/non-matching block transactions\n", pfrom->id); + return true; + } else if (status == READ_STATUS_FAILED) { + // Might have collided, fall back to getdata now :( + std::vector invs; + invs.push_back(CInv(MSG_BLOCK, resp.blockhash)); + pfrom->PushMessage(NetMsgType::GETDATA, invs); + } else { + CValidationState state; + ProcessNewBlock(state, chainparams, pfrom, &block, false, NULL); + int nDoS; + if (state.IsInvalid(nDoS)) { + assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes + pfrom->PushMessage(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(), + state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), block.GetHash()); + if (nDoS > 0) { + LOCK(cs_main); + Misbehaving(pfrom->GetId(), nDoS); + } + } + } + } + + else if (strCommand == NetMsgType::HEADERS && !fImporting && !fReindex) // Ignore headers received while importing { std::vector headers; @@ -5334,6 +5530,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, pindexLast->GetBlockHash().ToString(), pindexLast->nHeight); } if (vGetData.size() > 0) { + if (nodestate->fProvidesHeaderAndIDs && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN)) { + vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash); + } pfrom->PushMessage(NetMsgType::GETDATA, vGetData); } }