diff --git a/qa/rpc-tests/p2p_txexpiringsoon.py b/qa/rpc-tests/p2p_txexpiringsoon.py index 6472b6fba..4cc4a38c7 100755 --- a/qa/rpc-tests/p2p_txexpiringsoon.py +++ b/qa/rpc-tests/p2p_txexpiringsoon.py @@ -51,7 +51,7 @@ class TxExpiringSoonTest(BitcoinTestFramework): testnode.send_message(msg_mempool()) # Sync up with node after p2p messages delivered - testnode.sync_with_ping() + testnode.sync_with_ping(waiting_for=lambda x: x.last_inv) with mininode_lock: msg = testnode.last_inv diff --git a/qa/rpc-tests/shorter_block_times.py b/qa/rpc-tests/shorter_block_times.py index d3ff5e079..41c25a6df 100755 --- a/qa/rpc-tests/shorter_block_times.py +++ b/qa/rpc-tests/shorter_block_times.py @@ -63,6 +63,7 @@ class ShorterBlockTimes(BitcoinTestFramework): myopid = self.nodes[0].z_sendmany(node0_taddr, recipients, 1, 0) txid = wait_and_assert_operationid_status(self.nodes[0], myopid) assert_equal(147, self.nodes[0].getrawtransaction(txid, 1)['expiryheight']) # height + 1 + 40 + self.sync_all() # Ensure the transaction has propagated to node 1 self.nodes[1].generate(1) self.sync_all() assert_equal(20, Decimal(self.nodes[0].z_gettotalbalance()['private'])) diff --git a/qa/rpc-tests/tx_expiry_helper.py b/qa/rpc-tests/tx_expiry_helper.py index 94a6cc2c5..5b6594add 100755 --- a/qa/rpc-tests/tx_expiry_helper.py +++ b/qa/rpc-tests/tx_expiry_helper.py @@ -66,12 +66,13 @@ class TestNode(NodeConnCB): # The following function is mostly copied from p2p-acceptblock.py # Sync up with the node after delivery of a message - def sync_with_ping(self, timeout=30): + def sync_with_ping(self, timeout=30, waiting_for=None): self.connection.send_message(msg_ping(nonce=self.ping_counter)) sleep_time = 0.05 while timeout > 0: with mininode_lock: - if self.last_pong.nonce == self.ping_counter: + ready = True if waiting_for is None else waiting_for(self) is not None + if ready and self.last_pong.nonce == self.ping_counter: self.ping_counter += 1 return time.sleep(sleep_time) diff --git a/src/main.cpp b/src/main.cpp index 25b7cfd60..e3f2102d6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6150,7 +6150,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam // Send stream from relay memory { LOCK(cs_mapRelay); - map::iterator mi = mapRelay.find(inv); + map::iterator mi = mapRelay.find(inv.hash); if (mi != mapRelay.end()) { pfrom->PushMessage(inv.GetCommand(), (*mi).second); pushed = true; @@ -6158,10 +6158,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam } if (!pushed && inv.type == MSG_TX) { if (isInMempool) { - CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); - ss.reserve(1000); - ss << tx; - pfrom->PushMessage("tx", ss); + pfrom->PushMessage("tx", tx); pushed = true; } } @@ -6673,7 +6670,7 @@ bool static ProcessMessage(const CChainParams& chainparams, CNode* pfrom, string CValidationState state; pfrom->setAskFor.erase(inv.hash); - mapAlreadyAskedFor.erase(inv); + mapAlreadyAskedFor.erase(inv.hash); if (!AlreadyHave(inv) && AcceptToMemoryPool(chainparams, mempool, state, tx, true, &fMissingInputs)) { @@ -7301,7 +7298,7 @@ bool ProcessMessages(const CChainParams& chainparams, CNode* pfrom) } -bool SendMessages(const Consensus::Params& params, CNode* pto, bool fSendTrickle) +bool SendMessages(const Consensus::Params& params, CNode* pto) { { // Don't send anything until we get its version message @@ -7342,28 +7339,17 @@ bool SendMessages(const Consensus::Params& params, CNode* pto, bool fSendTrickle return true; // Address refresh broadcast - static int64_t nLastRebroadcast; - if (!IsInitialBlockDownload(params) && (GetTime() - nLastRebroadcast > 24 * 60 * 60)) - { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) - { - // Periodically clear addrKnown to allow refresh broadcasts - if (nLastRebroadcast) - pnode->addrKnown.reset(); - - // Rebroadcast our address - AdvertizeLocal(pnode); - } - if (!vNodes.empty()) - nLastRebroadcast = GetTime(); + int64_t nNow = GetTimeMicros(); + if (!IsInitialBlockDownload(params) && pto->nNextLocalAddrSend < nNow) { + AdvertizeLocal(pto); + pto->nNextLocalAddrSend = PoissonNextSend(nNow, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); } // // Message: addr // - if (fSendTrickle) - { + if (pto->nNextAddrSend < nNow) { + pto->nNextAddrSend = PoissonNextSend(nNow, AVG_ADDRESS_BROADCAST_INTERVAL); vector vAddr; vAddr.reserve(pto->vAddrToSend.size()); for (const CAddress& addr : pto->vAddrToSend) @@ -7443,8 +7429,13 @@ bool SendMessages(const Consensus::Params& params, CNode* pto, bool fSendTrickle vector vInv; vector vInvWait; { + bool fSendTrickle = pto->fWhitelisted; + if (pto->nNextInvSend < nNow) { + fSendTrickle = true; + pto->nNextInvSend = PoissonNextSend(nNow, AVG_INVENTORY_BROADCAST_INTERVAL); + } LOCK(pto->cs_inventory); - vInv.reserve(pto->vInventoryToSend.size()); + vInv.reserve(std::min(1000, pto->vInventoryToSend.size())); vInvWait.reserve(pto->vInventoryToSend.size()); for (const CInv& inv : pto->vInventoryToSend) { @@ -7484,7 +7475,7 @@ bool SendMessages(const Consensus::Params& params, CNode* pto, bool fSendTrickle pto->PushMessage("inv", vInv); // Detect whether we're stalling - int64_t nNow = GetTimeMicros(); + nNow = GetTimeMicros(); if (!pto->fDisconnect && state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection diff --git a/src/main.h b/src/main.h index e9aa2b80a..6be14c774 100644 --- a/src/main.h +++ b/src/main.h @@ -114,6 +114,14 @@ static const unsigned int WITNESS_WRITE_INTERVAL = 10 * 60; static const unsigned int WITNESS_WRITE_UPDATES = 10000; /** Maximum length of reject messages. */ static const unsigned int MAX_REJECT_MESSAGE_LENGTH = 111; +/** Average delay between local address broadcasts in seconds. */ +static const unsigned int AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL = 24 * 24 * 60; +/** Average delay between peer address broadcasts in seconds. */ +static const unsigned int AVG_ADDRESS_BROADCAST_INTERVAL = 30; +/** Average delay between trickled inventory broadcasts in seconds. + * Blocks, whitelisted receivers, and a random 25% of transactions bypass this. */ +static const unsigned int AVG_INVENTORY_BROADCAST_INTERVAL = 5; + static const unsigned int DEFAULT_LIMITFREERELAY = 15; static const bool DEFAULT_RELAYPRIORITY = false; static const int64_t DEFAULT_MAX_TIP_AGE = 24 * 60 * 60; @@ -256,9 +264,8 @@ bool ProcessMessages(const CChainParams& chainparams, CNode* pfrom); * * @param[in] params Active chain parameters. * @param[in] pto The node which we are sending messages to. - * @param[in] fSendTrickle When true send the trickled data, otherwise trickle the data until true. */ -bool SendMessages(const Consensus::Params& params, CNode* pto, bool fSendTrickle); +bool SendMessages(const Consensus::Params& params, CNode* pto); /** Run an instance of the script checking thread */ void ThreadScriptCheck(); /** Check whether we are doing an initial block download (synchronizing from disk or network) */ diff --git a/src/net.cpp b/src/net.cpp index 08fb6f9fe..3e36ded54 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -30,6 +30,8 @@ #include +#include + #include // Dump addresses to peers.dat and banlist.dat every 15 minutes (900s) @@ -71,10 +73,10 @@ std::string strSubVersion; vector vNodes; CCriticalSection cs_vNodes; -map mapRelay; -deque > vRelayExpiration; +map mapRelay; +deque > vRelayExpiration; CCriticalSection cs_mapRelay; -limitedmap mapAlreadyAskedFor(MAX_INV_SZ); +limitedmap mapAlreadyAskedFor(MAX_INV_SZ); static deque vOneShots; static CCriticalSection cs_vOneShots; @@ -1692,11 +1694,6 @@ void ThreadMessageHandler() } } - // Poll the connected nodes for messages - CNode* pnodeTrickle = NULL; - if (!vNodesCopy.empty()) - pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; - bool fSleep = true; for (CNode* pnode : vNodesCopy) @@ -1729,7 +1726,7 @@ void ThreadMessageHandler() { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - g_signals.SendMessages(chainparams.GetConsensus(), pnode, pnode == pnodeTrickle || pnode->fWhitelisted); + g_signals.SendMessages(chainparams.GetConsensus(), pnode); } boost::this_thread::interruption_point(); } @@ -2036,14 +2033,6 @@ instance_of_cnetcleanup; void RelayTransaction(const CTransaction& tx) -{ - CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); - ss.reserve(10000); - ss << tx; - RelayTransaction(tx, ss); -} - -void RelayTransaction(const CTransaction& tx, const CDataStream& ss) { CInv inv(MSG_TX, tx.GetHash()); { @@ -2055,9 +2044,8 @@ void RelayTransaction(const CTransaction& tx, const CDataStream& ss) vRelayExpiration.pop_front(); } - // Save original serialized message so newer versions are preserved - mapRelay.insert(std::make_pair(inv, ss)); - vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, inv)); + mapRelay.insert(std::make_pair(inv.hash, tx)); + vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, inv.hash)); } LOCK(cs_vNodes); for (CNode* pnode : vNodes) @@ -2258,6 +2246,9 @@ CNode::CNode(SOCKET hSocketIn, const CAddress& addrIn, const std::string& addrNa nStartingHeight = -1; filterInventoryKnown.reset(); fGetAddr = false; + nNextLocalAddrSend = 0; + nNextAddrSend = 0; + nNextInvSend = 0; fRelayTxes = false; fSentAddr = false; pfilter = new CBloomFilter(); @@ -2318,7 +2309,7 @@ void CNode::AskFor(const CInv& inv) // We're using mapAskFor as a priority queue, // the key is the earliest time the request can be sent int64_t nRequestTime; - limitedmap::const_iterator it = mapAlreadyAskedFor.find(inv); + limitedmap::const_iterator it = mapAlreadyAskedFor.find(inv.hash); if (it != mapAlreadyAskedFor.end()) nRequestTime = it->second; else @@ -2337,7 +2328,7 @@ void CNode::AskFor(const CInv& inv) if (it != mapAlreadyAskedFor.end()) mapAlreadyAskedFor.update(it, nRequestTime); else - mapAlreadyAskedFor.insert(std::make_pair(inv, nRequestTime)); + mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime)); mapAskFor.insert(std::make_pair(nRequestTime, inv)); } @@ -2418,3 +2409,7 @@ void CNode::EndMessage() UNLOCK_FUNCTION(cs_vSend) return CSipHasher(k0, k1).Write(&vchNetGroup[0], vchNetGroup.size()).Finalize(); } + +int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) { + return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5); +} diff --git a/src/net.h b/src/net.h index 0dd4d9bc4..499fd7a6d 100644 --- a/src/net.h +++ b/src/net.h @@ -117,7 +117,7 @@ struct CNodeSignals { boost::signals2::signal GetHeight; boost::signals2::signal ProcessMessages; - boost::signals2::signal SendMessages; + boost::signals2::signal SendMessages; boost::signals2::signal InitializeNode; boost::signals2::signal FinalizeNode; }; @@ -164,10 +164,10 @@ extern int nMaxConnections; extern std::vector vNodes; extern CCriticalSection cs_vNodes; -extern std::map mapRelay; -extern std::deque > vRelayExpiration; +extern std::map mapRelay; +extern std::deque > vRelayExpiration; extern CCriticalSection cs_mapRelay; -extern limitedmap mapAlreadyAskedFor; +extern limitedmap mapAlreadyAskedFor; extern std::vector vAddedNodes; extern CCriticalSection cs_vAddedNodes; @@ -336,6 +336,8 @@ public: CRollingBloomFilter addrKnown; bool fGetAddr; std::set setKnown; + int64_t nNextAddrSend; + int64_t nNextLocalAddrSend; // inventory based relay CRollingBloomFilter filterInventoryKnown; @@ -343,6 +345,7 @@ public: CCriticalSection cs_inventory; std::set setAskFor; std::multimap mapAskFor; + int64_t nNextInvSend; // Ping time measurement: // The pong reply we're expecting, or 0 if no pong expected. @@ -720,7 +723,9 @@ public: class CTransaction; void RelayTransaction(const CTransaction& tx); -void RelayTransaction(const CTransaction& tx, const CDataStream& ss); +/** Return a timestamp in the future (in microseconds) for exponentially distributed events. */ +int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds); + #endif // BITCOIN_NET_H diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index 89b5220e6..b006c2160 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -54,7 +54,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning) CNode dummyNode1(INVALID_SOCKET, addr1, "", true); dummyNode1.nVersion = 1; Misbehaving(dummyNode1.GetId(), 100); // Should get banned - SendMessages(params, &dummyNode1, false); + SendMessages(params, &dummyNode1); BOOST_CHECK(CNode::IsBanned(addr1)); BOOST_CHECK(!CNode::IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned @@ -62,11 +62,11 @@ BOOST_AUTO_TEST_CASE(DoS_banning) CNode dummyNode2(INVALID_SOCKET, addr2, "", true); dummyNode2.nVersion = 1; Misbehaving(dummyNode2.GetId(), 50); - SendMessages(params, &dummyNode2, false); + SendMessages(params, &dummyNode2); BOOST_CHECK(!CNode::IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(CNode::IsBanned(addr1)); // ... but 1 still should be Misbehaving(dummyNode2.GetId(), 50); - SendMessages(params, &dummyNode2, false); + SendMessages(params, &dummyNode2); BOOST_CHECK(CNode::IsBanned(addr2)); } @@ -79,13 +79,13 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) CNode dummyNode1(INVALID_SOCKET, addr1, "", true); dummyNode1.nVersion = 1; Misbehaving(dummyNode1.GetId(), 100); - SendMessages(params, &dummyNode1, false); + SendMessages(params, &dummyNode1); BOOST_CHECK(!CNode::IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 10); - SendMessages(params, &dummyNode1, false); + SendMessages(params, &dummyNode1); BOOST_CHECK(!CNode::IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 1); - SendMessages(params, &dummyNode1, false); + SendMessages(params, &dummyNode1); BOOST_CHECK(CNode::IsBanned(addr1)); mapArgs.erase("-banscore"); } @@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) dummyNode.nVersion = 1; Misbehaving(dummyNode.GetId(), 100); - SendMessages(params, &dummyNode, false); + SendMessages(params, &dummyNode); BOOST_CHECK(CNode::IsBanned(addr)); SetMockTime(nStartTime+60*60); diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 112a065eb..a90585911 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -59,6 +59,8 @@ CTxMemPoolEntry::GetPriority(unsigned int currentHeight) const CTxMemPool::CTxMemPool(const CFeeRate& _minRelayFee) : nTransactionsUpdated(0) { + _clear(); // unlocked clear + // Sanity checks off by default for performance, because otherwise // accepting transactions becomes O(N^2) where N is the number // of transactions in the pool @@ -494,9 +496,8 @@ void CTxMemPool::removeWithoutBranchId(uint32_t nMemPoolBranchId) } } -void CTxMemPool::clear() +void CTxMemPool::_clear() { - LOCK(cs); mapTx.clear(); mapNextTx.clear(); totalTxSize = 0; @@ -504,6 +505,12 @@ void CTxMemPool::clear() ++nTransactionsUpdated; } +void CTxMemPool::clear() +{ + LOCK(cs); + _clear(); +} + void CTxMemPool::check(const CCoinsViewCache *pcoins) const { if (nCheckFrequency == 0) diff --git a/src/txmempool.h b/src/txmempool.h index 530ce44a7..8c3d396b1 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -210,6 +210,7 @@ public: std::list& conflicts, bool fCurrentEstimate = true); void removeWithoutBranchId(uint32_t nMemPoolBranchId); void clear(); + void _clear(); // unlocked void queryHashes(std::vector& vtxid); void pruneSpent(const uint256& hash, CCoins &coins); unsigned int GetTransactionsUpdated() const;