SyncTransaction->TxAddedToMempool/BlockConnected/Disconnected

This simplifies fixing the wallet-returns-stale-info issue as we
can now hold cs_wallet across an entire block instead of only
per-tx (though we only actually do so in the next commit).

This change also removes the NOT_IN_BLOCK constant in favor of only
passing the CBlockIndex* parameter to SyncTransactions when a new
block is being connected, instead of also when a block is being
disconnected.

This change adds a parameter to BlockConnectedDisconnected which
lists the transactions which were removed from mempool due to
confliction as a result of this operation. While its somewhat of a
shame to make block-validation-logic generate a list of mempool
changes to be included in its generated callbacks, fixing this isnt
too hard.

Further in this change-set, CValidationInterface starts listening
to mempool directly, placing it in the middle and giving it a bit
of logic to know how to route notifications from block-validation,
mempool, etc (though not listening for conflicted-removals yet).
This commit is contained in:
Matt Corallo 2017-03-29 21:12:42 -04:00
parent f404334910
commit 461e49fee2
9 changed files with 109 additions and 63 deletions

View File

@ -744,21 +744,23 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn) : connman(connmanI
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
}
void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int nPosInBlock) {
if (nPosInBlock == CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK)
return;
void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex, const std::vector<CTransactionRef>& vtxConflicted) {
LOCK(cs_main);
std::vector<uint256> vOrphanErase;
// Which orphan pool entries must we evict?
for (size_t j = 0; j < tx.vin.size(); j++) {
auto itByPrev = mapOrphanTransactionsByPrev.find(tx.vin[j].prevout);
if (itByPrev == mapOrphanTransactionsByPrev.end()) continue;
for (auto mi = itByPrev->second.begin(); mi != itByPrev->second.end(); ++mi) {
const CTransaction& orphanTx = *(*mi)->second.tx;
const uint256& orphanHash = orphanTx.GetHash();
vOrphanErase.push_back(orphanHash);
for (const CTransactionRef& ptx : pblock->vtx) {
const CTransaction& tx = *ptx;
// Which orphan pool entries must we evict?
for (size_t j = 0; j < tx.vin.size(); j++) {
auto itByPrev = mapOrphanTransactionsByPrev.find(tx.vin[j].prevout);
if (itByPrev == mapOrphanTransactionsByPrev.end()) continue;
for (auto mi = itByPrev->second.begin(); mi != itByPrev->second.end(); ++mi) {
const CTransaction& orphanTx = *(*mi)->second.tx;
const uint256& orphanHash = orphanTx.GetHash();
vOrphanErase.push_back(orphanHash);
}
}
}

View File

@ -30,7 +30,7 @@ private:
public:
PeerLogicValidation(CConnman* connmanIn);
virtual void SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int nPosInBlock);
virtual void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted);
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
virtual void BlockChecked(const CBlock& block, const CValidationState& state);
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock);

View File

@ -949,7 +949,7 @@ bool AcceptToMemoryPoolWorker(CTxMemPool& pool, CValidationState& state, const C
}
}
GetMainSignals().SyncTransaction(tx, NULL, CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK);
GetMainSignals().TransactionAddedToMempool(ptx);
return true;
}
@ -2120,7 +2120,8 @@ bool static DisconnectTip(CValidationState& state, const CChainParams& chainpara
CBlockIndex *pindexDelete = chainActive.Tip();
assert(pindexDelete);
// Read block from disk.
CBlock block;
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
CBlock& block = *pblock;
if (!ReadBlockFromDisk(block, pindexDelete, chainparams.GetConsensus()))
return AbortNode(state, "Failed to read block");
// Apply the block atomically to the chain state.
@ -2162,9 +2163,7 @@ bool static DisconnectTip(CValidationState& state, const CChainParams& chainpara
UpdateTip(pindexDelete->pprev, chainparams);
// Let wallets know transactions went from 1-confirmed to
// 0-confirmed or conflicted:
for (const auto& tx : block.vtx) {
GetMainSignals().SyncTransaction(*tx, pindexDelete->pprev, CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK);
}
GetMainSignals().BlockDisconnected(pblock);
return true;
}
@ -2511,29 +2510,9 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams,
pindexFork = chainActive.FindFork(pindexOldTip);
fInitialDownload = IsInitialBlockDownload();
// TODO: Temporarily ensure that mempool removals are notified before
// connected transactions. This shouldn't matter, but the abandoned
// state of transactions in our wallet is currently cleared when we
// receive another notification and there is a race condition where
// notification of a connected conflict might cause an outside process
// to abandon a transaction and then have it inadvertently cleared by
// the notification that the conflicted transaction was evicted.
// throw all transactions though the signal-interface
auto blocksConnected = connectTrace.GetBlocksConnected();
for (const PerBlockConnectTrace& trace : blocksConnected) {
assert(trace.conflictedTxs);
for (const auto& tx : *trace.conflictedTxs) {
GetMainSignals().SyncTransaction(*tx, NULL, CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK);
}
}
// Transactions in the connected block are notified
for (const PerBlockConnectTrace& trace : blocksConnected) {
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
const CBlock& block = *(trace.pblock);
for (unsigned int i = 0; i < block.vtx.size(); i++)
GetMainSignals().SyncTransaction(*block.vtx[i], trace.pindex, i);
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, *trace.conflictedTxs);
}
}
// When we reach this point, we switched to a new tip (stored in pindexNewTip).

View File

@ -14,7 +14,9 @@ CMainSignals& GetMainSignals()
void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3));
g_signals.TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
g_signals.SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
@ -33,7 +35,9 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.Inventory.disconnect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
g_signals.SetBestChain.disconnect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3));
g_signals.TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
@ -46,7 +50,9 @@ void UnregisterAllValidationInterfaces() {
g_signals.Inventory.disconnect_all_slots();
g_signals.SetBestChain.disconnect_all_slots();
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.SyncTransaction.disconnect_all_slots();
g_signals.TransactionAddedToMempool.disconnect_all_slots();
g_signals.BlockConnected.disconnect_all_slots();
g_signals.BlockDisconnected.disconnect_all_slots();
g_signals.UpdatedBlockTip.disconnect_all_slots();
g_signals.NewPoWValidBlock.disconnect_all_slots();
}

View File

@ -10,13 +10,14 @@
#include <boost/shared_ptr.hpp>
#include <memory>
#include "primitives/transaction.h" // CTransaction(Ref)
class CBlock;
class CBlockIndex;
struct CBlockLocator;
class CBlockIndex;
class CConnman;
class CReserveScript;
class CTransaction;
class CValidationInterface;
class CValidationState;
class uint256;
@ -33,7 +34,9 @@ void UnregisterAllValidationInterfaces();
class CValidationInterface {
protected:
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
virtual void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock) {}
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
virtual void SetBestChain(const CBlockLocator &locator) {}
virtual void UpdatedTransaction(const uint256 &hash) {}
virtual void Inventory(const uint256 &hash) {}
@ -50,17 +53,15 @@ protected:
struct CMainSignals {
/** Notifies listeners of updated block chain tip */
boost::signals2::signal<void (const CBlockIndex *, const CBlockIndex *, bool fInitialDownload)> UpdatedBlockTip;
/** A posInBlock value for SyncTransaction calls for transactions not
* included in connected blocks such as transactions removed from mempool,
* accepted to mempool or appearing in disconnected blocks.*/
static const int SYNC_TRANSACTION_NOT_IN_BLOCK = -1;
/** Notifies listeners of updated transaction data (transaction, and
* optionally the block it is found in). Called with block data when
* transaction is included in a connected block, and without block data when
* transaction was accepted to mempool, removed from mempool (only when
* removal was due to conflict from connected block), or appeared in a
* disconnected block.*/
boost::signals2::signal<void (const CTransaction &, const CBlockIndex *pindex, int posInBlock)> SyncTransaction;
/** Notifies listeners of a transaction having been added to mempool. */
boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
*/
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef> &)> BlockConnected;
/** Notifies listeners of a block being disconnected */
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &)> BlockDisconnected;
/** Notifies listeners of an updated transaction without new data (for now: a coinbase potentially becoming visible). */
boost::signals2::signal<void (const uint256 &)> UpdatedTransaction;
/** Notifies listeners of a new active block chain. */

View File

@ -1116,11 +1116,10 @@ void CWallet::MarkConflicted(const uint256& hashBlock, const uint256& hashTx)
}
}
void CWallet::SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock)
{
LOCK2(cs_main, cs_wallet);
void CWallet::SyncTransaction(const CTransactionRef& ptx, const CBlockIndex *pindexBlockConnected, int posInBlock) {
const CTransaction& tx = *ptx;
if (!AddToWalletIfInvolvingMe(tx, pindex, posInBlock, true))
if (!AddToWalletIfInvolvingMe(tx, pindexBlockConnected, posInBlock, true))
return; // Not one of ours
// If a transaction changes 'conflicted' state, that changes the balance
@ -1133,6 +1132,38 @@ void CWallet::SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex,
}
}
void CWallet::TransactionAddedToMempool(const CTransactionRef& ptx) {
LOCK2(cs_main, cs_wallet);
SyncTransaction(ptx, NULL, -1);
}
void CWallet::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) {
// TODO: Tempoarily ensure that mempool removals are notified before
// connected transactions. This shouldn't matter, but the abandoned
// state of transactions in our wallet is currently cleared when we
// receive another notification and there is a race condition where
// notification of a connected conflict might cause an outside process
// to abandon a transaction and then have it inadvertantly cleared by
// the notification that the conflicted transaction was evicted.
for (const CTransactionRef& ptx : vtxConflicted) {
LOCK2(cs_main, cs_wallet);
SyncTransaction(ptx, NULL, -1);
}
for (size_t i = 0; i < pblock->vtx.size(); i++) {
LOCK2(cs_main, cs_wallet);
SyncTransaction(pblock->vtx[i], pindex, i);
}
}
void CWallet::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) {
for (const CTransactionRef& ptx : pblock->vtx) {
LOCK2(cs_main, cs_wallet);
SyncTransaction(ptx, NULL, -1);
}
}
isminetype CWallet::IsMine(const CTxIn &txin) const
{

View File

@ -661,6 +661,9 @@ private:
void SyncMetaData(std::pair<TxSpends::iterator, TxSpends::iterator>);
/* Used by TransactionAddedToMemorypool/BlockConnected/Disconnected */
void SyncTransaction(const CTransactionRef& tx, const CBlockIndex *pindexBlockConnected, int posInBlock);
/* the HD chain data model (external chain counters) */
CHDChain hdChain;
@ -849,7 +852,9 @@ public:
void MarkDirty();
bool AddToWallet(const CWalletTx& wtxIn, bool fFlushOnClose=true);
bool LoadToWallet(const CWalletTx& wtxIn);
void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock) override;
void TransactionAddedToMempool(const CTransactionRef& tx) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
bool AddToWalletIfInvolvingMe(const CTransaction& tx, const CBlockIndex* pIndex, int posInBlock, bool fUpdate);
CBlockIndex* ScanForWalletTransactions(CBlockIndex* pindexStart, bool fUpdate = false);
void ReacceptWalletTransactions();

View File

@ -144,8 +144,12 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
}
}
void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
{
// Used by BlockConnected and BlockDisconnected as well, because they're
// all the same external callback.
const CTransaction& tx = *ptx;
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
@ -160,3 +164,19 @@ void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CB
}
}
}
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction added in the block
TransactionAddedToMempool(ptx);
}
}
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction removed in block disconnection
TransactionAddedToMempool(ptx);
}
}

View File

@ -25,7 +25,9 @@ protected:
void Shutdown();
// CValidationInterface
void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock);
void TransactionAddedToMempool(const CTransactionRef& tx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted);
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock);
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
private: