From 576d1b7134e557f9be59011c1e884123c8f48629 Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Mon, 18 Jul 2022 15:37:44 +0000 Subject: [PATCH] wallet: Add `BatchScanner` interface to `CValidationInterface` `CValidationInterface` listeners can either listen directly to `CValidationInterface::SyncTransaction` as they currently do, or they can listen to `CValidationInterface::InitBatchScanner` and then process transactions via `BatchScanner::SyncTransaction`. The latter approach allows listeners to perform trial decryption via whatever strategy is most optimal for them. --- src/validationinterface.cpp | 189 +++++++++++++++++++++++++++++++++++- src/validationinterface.h | 85 +++++++++++++++- 2 files changed, 268 insertions(+), 6 deletions(-) diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 52db414c8..6d3d0630b 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -28,6 +28,7 @@ CMainSignals& GetMainSignals() void RegisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1)); + g_signals.GetBatchScanner.connect(boost::bind(&CValidationInterface::GetBatchScanner, pwalletIn)); g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3)); g_signals.EraseTransaction.connect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1)); g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1)); @@ -49,6 +50,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1)); g_signals.EraseTransaction.disconnect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1)); g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3)); + g_signals.GetBatchScanner.disconnect(boost::bind(&CValidationInterface::GetBatchScanner, pwalletIn)); g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1)); } @@ -62,10 +64,38 @@ void UnregisterAllValidationInterfaces() { g_signals.UpdatedTransaction.disconnect_all_slots(); g_signals.EraseTransaction.disconnect_all_slots(); g_signals.SyncTransaction.disconnect_all_slots(); + g_signals.GetBatchScanner.disconnect_all_slots(); g_signals.UpdatedBlockTip.disconnect_all_slots(); } -void SyncWithWallets(const CTransaction &tx, const CBlock *pblock, const int nHeight) { +void AddTxToBatches( + std::vector &batchScanners, + const CTransaction &tx, + const int nHeight) +{ + CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION); + ssTx << tx; + std::vector txBytes(ssTx.begin(), ssTx.end()); + for (auto& batchScanner : batchScanners) { + batchScanner->AddTransaction(tx, txBytes, nHeight); + } +} + +void FlushBatches(std::vector &batchScanners) { + for (auto& batchScanner : batchScanners) { + batchScanner->Flush(); + } +} + +void SyncWithWallets( + std::vector &batchScanners, + const CTransaction &tx, + const CBlock *pblock, + const int nHeight) +{ + for (auto& batchScanner : batchScanners) { + batchScanner->SyncTransaction(tx, pblock, nHeight); + } g_signals.SyncTransaction(tx, pblock, nHeight); } @@ -185,6 +215,155 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip) // network message processing thread. // + // The wallet inherited from Bitcoin Core was built around the following + // general workflow for moving from one chain tip to another: + // + // - For each block in the old chain, from its tip to the fork point: + // - For each transaction in the block: + // - 1️⃣ Trial-decrypt the transaction's shielded outputs. + // - If the transaction belongs to the wallet: + // - 2️⃣ Add or update the transaction, and mark it as dirty. + // - Update the wallet's view of the chain tip. + // - 3️⃣ In `zcashd`, this is when we decrement note witnesses. + // - For each block in the new chain, from the fork point to its tip: + // - For each transaction that became conflicted by this block: + // - 4️⃣ Trial-decrypt the transaction's shielded outputs. + // - If the transaction belongs to the wallet: + // - 5️⃣ Add or update the transaction, and mark it as dirty. + // - For each transaction in the block: + // - 6️⃣ Trial-decrypt the transaction's shielded outputs. + // - If the transaction belongs to the wallet: + // - 7️⃣ Add or update the transaction, and mark it as dirty. + // - Update the wallet's view of the chain tip. + // - 8️⃣ In `zcashd`, this is when we increment note witnesses. + // - For each transaction in the mempool: + // - 9️⃣ Trial-decrypt the transaction's shielded outputs. + // - If the transaction belongs to the wallet: + // - 🅰️ Add or update the transaction, and mark it as dirty. + // + // Steps 2️⃣, 3️⃣, 5️⃣, 7️⃣, 8️⃣, and 🅰️ are where wallet state is updated, + // and the relative order of these updates must be preserved in order to + // avoid breaking any internal assumptions that the wallet makes. + // + // Steps 1️⃣, 4️⃣, 6️⃣, and 9️⃣ can be performed at any time, as long as + // their results are available when their respective conditionals are + // evaluated. We therefore refactor the above workflow to enable the + // trial-decryption work to be batched and parallelised: + // + // - For each block in the old chain, from its tip to the fork point: + // - For each transaction in the block: + // - Accumulate its Sprout, Sapling, and Orchard outputs. + // - For each block in the new chain, from the fork point to its tip: + // - For each transaction that became conflicted by this block: + // - Accumulate its Sprout, Sapling, and Orchard outputs. + // - For each transaction in the block: + // - Accumulate its Sprout, Sapling, and Orchard outputs. + // + // - 1️⃣4️⃣6️⃣9️⃣ Trial-decrypt the Sprout, Sapling, and Orchard outputs. + // - This can split up and batch the work however is most efficient. + // + // - For each block in the old chain, from its tip to the fork point: + // - For each transaction in the block: + // - If the transaction has decrypted outputs, or transparent inputs + // that belong to the wallet: + // - 2️⃣ Add or update the transaction, and mark it as dirty. + // - Update the wallet's view of the chain tip. + // - 3️⃣ In `zcashd`, this is when we decrement note witnesses. + // - For each block in the new chain, from the fork point to its tip: + // - For each transaction that became conflicted by this block: + // - If the transaction has decrypted outputs, or transparent inputs + // that belong to the wallet: + // - 5️⃣ Add or update the transaction, and mark it as dirty. + // - For each transaction in the block: + // - If the transaction has decrypted outputs, or transparent inputs + // that belong to the wallet: + // - 7️⃣ Add or update the transaction, and mark it as dirty. + // - Update the wallet's view of the chain tip. + // - 8️⃣ In `zcashd`, this is when we increment note witnesses. + // - For each transaction in the mempool: + // - If the transaction has decrypted outputs, or transparent inputs + // that belong to the wallet: + // - 🅰️ Add or update the transaction, and mark it as dirty. + + // Get a new handle to the BatchScanner for each listener in each loop. + // This allows the listeners to alter their scanning logic over time, + // for example to add new incoming viewing keys. + auto batchScanners = GetMainSignals().GetBatchScanner(); + + if (!batchScanners.empty()) { + // Batch the shielded outputs across all blocks being processed. + // TODO: We can probably not bother trial-decrypting transactions + // in blocks being disconnected, or that are becoming conflicted, + // instead doing a plain "is this tx in the wallet" check. However, + // the logic in AddToWalletIfInvolvingMe would need to be carefully + // checked to ensure its side-effects are correctly preserved, so + // for now we maintain the previous behaviour of trial-decrypting + // everything. + + // Batch block disconnects. + auto pindexScan = pindexLastTip; + while (pindexScan && pindexScan != pindexFork) { + // Read block from disk. + CBlock block; + if (!ReadBlockFromDisk(block, pindexScan, chainParams.GetConsensus())) { + LogPrintf( + "*** %s: Failed to read block %s while collecting shielded outputs", + __func__, pindexScan->GetBlockHash().GetHex()); + uiInterface.ThreadSafeMessageBox( + _("Error: A fatal internal error occurred, see debug.log for details"), + "", CClientUIInterface::MSG_ERROR); + StartShutdown(); + return; + } + + // Batch transactions that went from 1-confirmed to 0-confirmed + // or conflicted. + for (const CTransaction &tx : block.vtx) { + AddTxToBatches(batchScanners, tx, pindexScan->nHeight); + } + + // On to the next block! + pindexScan = pindexScan->pprev; + } + + // Batch block connections. Process blockStack in the same order we + // do below, so batched work can be completed in roughly the order + // we need it. + for (auto it = blockStack.rbegin(); it != blockStack.rend(); ++it) { + const auto& blockData = *it; + + // Read block from disk. + CBlock block; + if (!ReadBlockFromDisk(block, blockData.pindex, chainParams.GetConsensus())) { + LogPrintf( + "*** %s: Failed to read block %s while collecting shielded outputs from block connects", + __func__, blockData.pindex->GetBlockHash().GetHex()); + uiInterface.ThreadSafeMessageBox( + _("Error: A fatal internal error occurred, see debug.log for details"), + "", CClientUIInterface::MSG_ERROR); + StartShutdown(); + return; + } + + // Batch transactions that went from mempool to conflicted: + for (const CTransaction &tx : blockData.txConflicted) { + AddTxToBatches(batchScanners, tx, blockData.pindex->nHeight + 1); + } + // ... and transactions that got confirmed: + for (const CTransaction &tx : block.vtx) { + AddTxToBatches(batchScanners, tx, blockData.pindex->nHeight); + } + } + + // Batch transactions in the mempool. + for (auto tx : recentlyAdded.first) { + AddTxToBatches(batchScanners, tx, pindexLastTip->nHeight + 1); + } + } + + // Ensure that all pending work has been started. + FlushBatches(batchScanners); + // Notify block disconnects while (pindexLastTip && pindexLastTip != pindexFork) { // Read block from disk. @@ -203,7 +382,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip) // Let wallets know transactions went from 1-confirmed to // 0-confirmed or conflicted: for (const CTransaction &tx : block.vtx) { - SyncWithWallets(tx, NULL, pindexLastTip->nHeight); + SyncWithWallets(batchScanners, tx, NULL, pindexLastTip->nHeight); } // Update cached incremental witnesses // This will take the cs_main lock in order to obtain the CBlockLocator @@ -237,11 +416,11 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip) // Tell wallet about transactions that went from mempool // to conflicted: for (const CTransaction &tx : blockData.txConflicted) { - SyncWithWallets(tx, NULL, blockData.pindex->nHeight + 1); + SyncWithWallets(batchScanners, tx, NULL, blockData.pindex->nHeight + 1); } // ... and about transactions that got confirmed: for (const CTransaction &tx : block.vtx) { - SyncWithWallets(tx, &block, blockData.pindex->nHeight); + SyncWithWallets(batchScanners, tx, &block, blockData.pindex->nHeight); } // Update cached incremental witnesses // This will take the cs_main lock in order to obtain the CBlockLocator @@ -262,7 +441,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip) // Notify transactions in the mempool for (auto tx : recentlyAdded.first) { try { - SyncWithWallets(tx, NULL, pindexLastTip->nHeight + 1); + SyncWithWallets(batchScanners, tx, NULL, pindexLastTip->nHeight + 1); } catch (const boost::thread_interrupted&) { throw; } catch (const std::exception& e) { diff --git a/src/validationinterface.h b/src/validationinterface.h index dc472f5b6..9f387258f 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -24,6 +24,42 @@ class CValidationInterface; class CValidationState; class uint256; +class BatchScanner { +public: + /** + * Adds a transaction to the batch scanner. + * + * `block_tag` is the hash of the block that triggered this txid being added + * to the batch, or the all-zeros hash to indicate that no block triggered + * it (i.e. it was a mempool change). + */ + virtual void AddTransaction( + const CTransaction &tx, + const std::vector &txBytes, + const int nHeight) = 0; + + /** + * Flushes any pending batches. + * + * After calling this, every transaction passed to `AddTransaction` should + * have its result available when the matching call to `SyncTransaction` is + * made. + */ + virtual void Flush() = 0; + + /** + * Notifies the batch scanner of updated transaction data (transaction, and + * optionally the block it is found in). + * + * This will be called with transactions in the same order as they were + * `AddTransaction`. + */ + virtual void SyncTransaction( + const CTransaction &tx, + const CBlock *pblock, + const int nHeight) = 0; +}; + struct MerkleFrontiers { SproutMerkleTree sprout; SaplingMerkleTree sapling; @@ -42,6 +78,7 @@ void UnregisterAllValidationInterfaces(); class CValidationInterface { protected: virtual void UpdatedBlockTip(const CBlockIndex *pindex) {} + virtual BatchScanner* GetBatchScanner() { return nullptr; } virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock, const int nHeight) {} virtual void EraseFromWallet(const uint256 &hash) {} virtual void ChainTip(const CBlockIndex *pindex, const CBlock *pblock, std::optional added) {} @@ -56,10 +93,56 @@ protected: friend void ::UnregisterAllValidationInterfaces(); }; +// aggregate_non_null_values is a combiner which places any non-nullptr values +// returned from slots into a container. +template +struct aggregate_non_null_values +{ + typedef Container result_type; + + template + Container operator()(InputIterator first, InputIterator last) const + { + Container values; + + while (first != last) { + auto ptr = *first; + if (ptr != nullptr) { + values.push_back(ptr); + } + ++first; + } + return values; + } +}; + struct CMainSignals { /** Notifies listeners of updated block chain tip */ boost::signals2::signal UpdatedBlockTip; - /** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */ + /** + * Requests a pointer to the listener's batch scanner for shielded outputs, + * if it has one. + * + * The listener is responsible for managing the memory of the batch scanner. + * In practice each listener will have a single persistent batch scanner. + * + * This signal is called at the start of each notification loop, which runs + * on integer second boundaries. This is an opportunity for the listener to + * perform any updating of the batch scanner's internal state (such as + * updating its set of incoming viewing keys). + * + * Listeners of this signal should not listen to `SyncTransaction` or they + * will be notified about transactions twice. + */ + boost::signals2::signal< + BatchScanner* (), + aggregate_non_null_values>> GetBatchScanner; + /** + * Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. + * + * Listeners of this signal should not listen to `GetBatchScanner` or they + * will be notified about transactions twice. + */ boost::signals2::signal SyncTransaction; /** Notifies listeners of an erased transaction (currently disabled, requires transaction replacement). */ boost::signals2::signal EraseTransaction;