wallet: Add `BatchScanner` interface to `CValidationInterface`

`CValidationInterface` listeners can either listen directly to
`CValidationInterface::SyncTransaction` as they currently do, or they
can listen to `CValidationInterface::InitBatchScanner` and then process
transactions via `BatchScanner::SyncTransaction`. The latter approach
allows listeners to perform trial decryption via whatever strategy is
most optimal for them.
This commit is contained in:
Jack Grigg 2022-07-18 15:37:44 +00:00
parent 768850e237
commit 576d1b7134
2 changed files with 268 additions and 6 deletions

View File

@ -28,6 +28,7 @@ CMainSignals& GetMainSignals()
void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
g_signals.GetBatchScanner.connect(boost::bind(&CValidationInterface::GetBatchScanner, pwalletIn));
g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3));
g_signals.EraseTransaction.connect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1));
g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
@ -49,6 +50,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
g_signals.EraseTransaction.disconnect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1));
g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2, _3));
g_signals.GetBatchScanner.disconnect(boost::bind(&CValidationInterface::GetBatchScanner, pwalletIn));
g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
}
@ -62,10 +64,38 @@ void UnregisterAllValidationInterfaces() {
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.EraseTransaction.disconnect_all_slots();
g_signals.SyncTransaction.disconnect_all_slots();
g_signals.GetBatchScanner.disconnect_all_slots();
g_signals.UpdatedBlockTip.disconnect_all_slots();
}
void SyncWithWallets(const CTransaction &tx, const CBlock *pblock, const int nHeight) {
void AddTxToBatches(
std::vector<BatchScanner*> &batchScanners,
const CTransaction &tx,
const int nHeight)
{
CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION);
ssTx << tx;
std::vector<unsigned char> txBytes(ssTx.begin(), ssTx.end());
for (auto& batchScanner : batchScanners) {
batchScanner->AddTransaction(tx, txBytes, nHeight);
}
}
void FlushBatches(std::vector<BatchScanner*> &batchScanners) {
for (auto& batchScanner : batchScanners) {
batchScanner->Flush();
}
}
void SyncWithWallets(
std::vector<BatchScanner*> &batchScanners,
const CTransaction &tx,
const CBlock *pblock,
const int nHeight)
{
for (auto& batchScanner : batchScanners) {
batchScanner->SyncTransaction(tx, pblock, nHeight);
}
g_signals.SyncTransaction(tx, pblock, nHeight);
}
@ -185,6 +215,155 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// network message processing thread.
//
// The wallet inherited from Bitcoin Core was built around the following
// general workflow for moving from one chain tip to another:
//
// - For each block in the old chain, from its tip to the fork point:
// - For each transaction in the block:
// - 1⃣ Trial-decrypt the transaction's shielded outputs.
// - If the transaction belongs to the wallet:
// - 2⃣ Add or update the transaction, and mark it as dirty.
// - Update the wallet's view of the chain tip.
// - 3⃣ In `zcashd`, this is when we decrement note witnesses.
// - For each block in the new chain, from the fork point to its tip:
// - For each transaction that became conflicted by this block:
// - 4⃣ Trial-decrypt the transaction's shielded outputs.
// - If the transaction belongs to the wallet:
// - 5⃣ Add or update the transaction, and mark it as dirty.
// - For each transaction in the block:
// - 6⃣ Trial-decrypt the transaction's shielded outputs.
// - If the transaction belongs to the wallet:
// - 7⃣ Add or update the transaction, and mark it as dirty.
// - Update the wallet's view of the chain tip.
// - 8⃣ In `zcashd`, this is when we increment note witnesses.
// - For each transaction in the mempool:
// - 9⃣ Trial-decrypt the transaction's shielded outputs.
// - If the transaction belongs to the wallet:
// - 🅰️ Add or update the transaction, and mark it as dirty.
//
// Steps 2⃣, 3⃣, 5⃣, 7⃣, 8⃣, and 🅰️ are where wallet state is updated,
// and the relative order of these updates must be preserved in order to
// avoid breaking any internal assumptions that the wallet makes.
//
// Steps 1⃣, 4⃣, 6⃣, and 9⃣ can be performed at any time, as long as
// their results are available when their respective conditionals are
// evaluated. We therefore refactor the above workflow to enable the
// trial-decryption work to be batched and parallelised:
//
// - For each block in the old chain, from its tip to the fork point:
// - For each transaction in the block:
// - Accumulate its Sprout, Sapling, and Orchard outputs.
// - For each block in the new chain, from the fork point to its tip:
// - For each transaction that became conflicted by this block:
// - Accumulate its Sprout, Sapling, and Orchard outputs.
// - For each transaction in the block:
// - Accumulate its Sprout, Sapling, and Orchard outputs.
//
// - 1⃣4⃣6⃣9⃣ Trial-decrypt the Sprout, Sapling, and Orchard outputs.
// - This can split up and batch the work however is most efficient.
//
// - For each block in the old chain, from its tip to the fork point:
// - For each transaction in the block:
// - If the transaction has decrypted outputs, or transparent inputs
// that belong to the wallet:
// - 2⃣ Add or update the transaction, and mark it as dirty.
// - Update the wallet's view of the chain tip.
// - 3⃣ In `zcashd`, this is when we decrement note witnesses.
// - For each block in the new chain, from the fork point to its tip:
// - For each transaction that became conflicted by this block:
// - If the transaction has decrypted outputs, or transparent inputs
// that belong to the wallet:
// - 5⃣ Add or update the transaction, and mark it as dirty.
// - For each transaction in the block:
// - If the transaction has decrypted outputs, or transparent inputs
// that belong to the wallet:
// - 7⃣ Add or update the transaction, and mark it as dirty.
// - Update the wallet's view of the chain tip.
// - 8⃣ In `zcashd`, this is when we increment note witnesses.
// - For each transaction in the mempool:
// - If the transaction has decrypted outputs, or transparent inputs
// that belong to the wallet:
// - 🅰️ Add or update the transaction, and mark it as dirty.
// Get a new handle to the BatchScanner for each listener in each loop.
// This allows the listeners to alter their scanning logic over time,
// for example to add new incoming viewing keys.
auto batchScanners = GetMainSignals().GetBatchScanner();
if (!batchScanners.empty()) {
// Batch the shielded outputs across all blocks being processed.
// TODO: We can probably not bother trial-decrypting transactions
// in blocks being disconnected, or that are becoming conflicted,
// instead doing a plain "is this tx in the wallet" check. However,
// the logic in AddToWalletIfInvolvingMe would need to be carefully
// checked to ensure its side-effects are correctly preserved, so
// for now we maintain the previous behaviour of trial-decrypting
// everything.
// Batch block disconnects.
auto pindexScan = pindexLastTip;
while (pindexScan && pindexScan != pindexFork) {
// Read block from disk.
CBlock block;
if (!ReadBlockFromDisk(block, pindexScan, chainParams.GetConsensus())) {
LogPrintf(
"*** %s: Failed to read block %s while collecting shielded outputs",
__func__, pindexScan->GetBlockHash().GetHex());
uiInterface.ThreadSafeMessageBox(
_("Error: A fatal internal error occurred, see debug.log for details"),
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
return;
}
// Batch transactions that went from 1-confirmed to 0-confirmed
// or conflicted.
for (const CTransaction &tx : block.vtx) {
AddTxToBatches(batchScanners, tx, pindexScan->nHeight);
}
// On to the next block!
pindexScan = pindexScan->pprev;
}
// Batch block connections. Process blockStack in the same order we
// do below, so batched work can be completed in roughly the order
// we need it.
for (auto it = blockStack.rbegin(); it != blockStack.rend(); ++it) {
const auto& blockData = *it;
// Read block from disk.
CBlock block;
if (!ReadBlockFromDisk(block, blockData.pindex, chainParams.GetConsensus())) {
LogPrintf(
"*** %s: Failed to read block %s while collecting shielded outputs from block connects",
__func__, blockData.pindex->GetBlockHash().GetHex());
uiInterface.ThreadSafeMessageBox(
_("Error: A fatal internal error occurred, see debug.log for details"),
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
return;
}
// Batch transactions that went from mempool to conflicted:
for (const CTransaction &tx : blockData.txConflicted) {
AddTxToBatches(batchScanners, tx, blockData.pindex->nHeight + 1);
}
// ... and transactions that got confirmed:
for (const CTransaction &tx : block.vtx) {
AddTxToBatches(batchScanners, tx, blockData.pindex->nHeight);
}
}
// Batch transactions in the mempool.
for (auto tx : recentlyAdded.first) {
AddTxToBatches(batchScanners, tx, pindexLastTip->nHeight + 1);
}
}
// Ensure that all pending work has been started.
FlushBatches(batchScanners);
// Notify block disconnects
while (pindexLastTip && pindexLastTip != pindexFork) {
// Read block from disk.
@ -203,7 +382,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// Let wallets know transactions went from 1-confirmed to
// 0-confirmed or conflicted:
for (const CTransaction &tx : block.vtx) {
SyncWithWallets(tx, NULL, pindexLastTip->nHeight);
SyncWithWallets(batchScanners, tx, NULL, pindexLastTip->nHeight);
}
// Update cached incremental witnesses
// This will take the cs_main lock in order to obtain the CBlockLocator
@ -237,11 +416,11 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// Tell wallet about transactions that went from mempool
// to conflicted:
for (const CTransaction &tx : blockData.txConflicted) {
SyncWithWallets(tx, NULL, blockData.pindex->nHeight + 1);
SyncWithWallets(batchScanners, tx, NULL, blockData.pindex->nHeight + 1);
}
// ... and about transactions that got confirmed:
for (const CTransaction &tx : block.vtx) {
SyncWithWallets(tx, &block, blockData.pindex->nHeight);
SyncWithWallets(batchScanners, tx, &block, blockData.pindex->nHeight);
}
// Update cached incremental witnesses
// This will take the cs_main lock in order to obtain the CBlockLocator
@ -262,7 +441,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// Notify transactions in the mempool
for (auto tx : recentlyAdded.first) {
try {
SyncWithWallets(tx, NULL, pindexLastTip->nHeight + 1);
SyncWithWallets(batchScanners, tx, NULL, pindexLastTip->nHeight + 1);
} catch (const boost::thread_interrupted&) {
throw;
} catch (const std::exception& e) {

View File

@ -24,6 +24,42 @@ class CValidationInterface;
class CValidationState;
class uint256;
class BatchScanner {
public:
/**
* Adds a transaction to the batch scanner.
*
* `block_tag` is the hash of the block that triggered this txid being added
* to the batch, or the all-zeros hash to indicate that no block triggered
* it (i.e. it was a mempool change).
*/
virtual void AddTransaction(
const CTransaction &tx,
const std::vector<unsigned char> &txBytes,
const int nHeight) = 0;
/**
* Flushes any pending batches.
*
* After calling this, every transaction passed to `AddTransaction` should
* have its result available when the matching call to `SyncTransaction` is
* made.
*/
virtual void Flush() = 0;
/**
* Notifies the batch scanner of updated transaction data (transaction, and
* optionally the block it is found in).
*
* This will be called with transactions in the same order as they were
* `AddTransaction`.
*/
virtual void SyncTransaction(
const CTransaction &tx,
const CBlock *pblock,
const int nHeight) = 0;
};
struct MerkleFrontiers {
SproutMerkleTree sprout;
SaplingMerkleTree sapling;
@ -42,6 +78,7 @@ void UnregisterAllValidationInterfaces();
class CValidationInterface {
protected:
virtual void UpdatedBlockTip(const CBlockIndex *pindex) {}
virtual BatchScanner* GetBatchScanner() { return nullptr; }
virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock, const int nHeight) {}
virtual void EraseFromWallet(const uint256 &hash) {}
virtual void ChainTip(const CBlockIndex *pindex, const CBlock *pblock, std::optional<MerkleFrontiers> added) {}
@ -56,10 +93,56 @@ protected:
friend void ::UnregisterAllValidationInterfaces();
};
// aggregate_non_null_values is a combiner which places any non-nullptr values
// returned from slots into a container.
template<typename Container>
struct aggregate_non_null_values
{
typedef Container result_type;
template<typename InputIterator>
Container operator()(InputIterator first, InputIterator last) const
{
Container values;
while (first != last) {
auto ptr = *first;
if (ptr != nullptr) {
values.push_back(ptr);
}
++first;
}
return values;
}
};
struct CMainSignals {
/** Notifies listeners of updated block chain tip */
boost::signals2::signal<void (const CBlockIndex *)> UpdatedBlockTip;
/** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */
/**
* Requests a pointer to the listener's batch scanner for shielded outputs,
* if it has one.
*
* The listener is responsible for managing the memory of the batch scanner.
* In practice each listener will have a single persistent batch scanner.
*
* This signal is called at the start of each notification loop, which runs
* on integer second boundaries. This is an opportunity for the listener to
* perform any updating of the batch scanner's internal state (such as
* updating its set of incoming viewing keys).
*
* Listeners of this signal should not listen to `SyncTransaction` or they
* will be notified about transactions twice.
*/
boost::signals2::signal<
BatchScanner* (),
aggregate_non_null_values<std::vector<BatchScanner*>>> GetBatchScanner;
/**
* Notifies listeners of updated transaction data (transaction, and optionally the block it is found in.
*
* Listeners of this signal should not listen to `GetBatchScanner` or they
* will be notified about transactions twice.
*/
boost::signals2::signal<void (const CTransaction &, const CBlock *, const int nHeight)> SyncTransaction;
/** Notifies listeners of an erased transaction (currently disabled, requires transaction replacement). */
boost::signals2::signal<void (const uint256 &)> EraseTransaction;