wallet: Refactor `ThreadNotifyWallets` to support batch memory limits

ThreadNotifyWallets was created in order to decouple wallet scanning of
transactions from the main chain updates, avoiding timing leaks to
network peers. To further ensure that not even lock contention could be
used to extract timing information, ThreadNotifyWallets collects update
information from the main chain on integer second boundaries.

In general this means that the wallet is processing the last second's
worth of blocks between each synchronization point. However, when there
are sequences of blocks that are costly for the wallet to scan, it may
take the wallet longer than a second to process a second's worth of
blocks connected to the main chain. This means that ThreadNotifyWallets
needs to wait until the next integer second boundary before collecting
the next set of updates, which means it will be processing at least two
seconds' worth of blocks. For extended periods where the chain contains
many outputs, the wallet will get progressively further behind the main
chain.

At the time that ThreadNotifyWallets was created, the above behaviour
was fine, because while the wallet scanning process consumed a lot of
CPU time, it did not consume much memory (as blocks are stored on disk).
However, we recently added batch scanning, which requires allocating
memory for each output that is being scanned. For a wallet with a
growing gap between its scanned-to height and the chain tip height, the
size of the necessary allocation will grow with each integer second
boundary crossed, until the node reaches OOM.

The solution is to implement backpressure: if we reach a memory limit
before we've finished adding blocks to the batch scanners, then we start
consuming the results from the batch scanner to free up memory space for
subsequent batches.

This commit implements the logic necessary to interleave batch creation
and batch result consumption. It does not apply any batch memory limits,
and as such should be effectively a no-op refactor.

Co-authored-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Jack Grigg 2022-09-19 22:28:55 +00:00
parent 38e79f0b63
commit 58d7eb0f2c
1 changed files with 134 additions and 72 deletions

View File

@ -291,6 +291,59 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// for example to add new incoming viewing keys.
auto batchScanners = GetMainSignals().GetBatchScanner();
// Closure that returns true if batchScanners is using less memory than
// the desired limit.
auto belowBatchMemoryLimit = [&]() {
// For now, set no limit. This should mean that the refactor to
// introduce blockStackScanned is a no-op, as we still add every
// block in blockStack to batchScanners at the start.
return true;
};
// Closure that will add a block from blockStack to batchScanners.
auto batchScanConnectedBlock = [&](const CachedBlockData& blockData) {
// Read block from disk.
CBlock block;
if (!ReadBlockFromDisk(block, blockData.pindex, chainParams.GetConsensus())) {
LogPrintf(
"*** %s: Failed to read block %s while collecting shielded outputs from block connects",
__func__, blockData.pindex->GetBlockHash().GetHex());
uiInterface.ThreadSafeMessageBox(
strprintf(_("Error: A fatal internal error occurred, see %s for details"), GetDebugLogPath()),
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
return;
}
// Batch transactions that went from mempool to conflicted:
for (const CTransaction &tx : blockData.txConflicted) {
AddTxToBatches(
batchScanners,
tx,
blockData.pindex->GetBlockHash(),
blockData.pindex->nHeight + 1);
}
// ... and transactions that got confirmed:
for (const CTransaction &tx : block.vtx) {
AddTxToBatches(
batchScanners,
tx,
blockData.pindex->GetBlockHash(),
blockData.pindex->nHeight);
}
};
// Store the iterator pointing to the next block in blockStack that
// should be added to the batch scanners. This will always be valid,
// because:
// - blockStack is only mutated via std::vector::pop_back() below this
// point, which only invalidates std::vector::end() and iterators to
// the last element of the vector.
// - We check below that blockStack.rbegin() != blockStackScanned before
// any call to blockStack.pop_back(), which ensures that we never pop
// the block that blockStackScanned is pointing to.
auto blockStackScanned = blockStack.rbegin();
if (!batchScanners.empty()) {
// Batch the shielded outputs across all blocks being processed.
// TODO: We can probably not bother trial-decrypting transactions
@ -301,7 +354,12 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// for now we maintain the previous behaviour of trial-decrypting
// everything.
// Batch block disconnects.
// Batch block disconnects. We always add every disconnected block
// to batchScanners, because this will be at most 100 blocks, which
// is at most 2GB, but in practice will be much less than this, and
// the growth problem in the interaction between this thread and the
// main node is in connected blocks. And per the above TODO, this
// might eventually be removed entirely.
auto pindexScan = pindexLastTip;
while (pindexScan && pindexScan != pindexFork) {
// Read block from disk.
@ -327,44 +385,22 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
pindexScan = pindexScan->pprev;
}
// Batch block connections. Process blockStack in the same order we
// do below, so batched work can be completed in roughly the order
// we need it.
for (auto it = blockStack.rbegin(); it != blockStack.rend(); ++it) {
const auto& blockData = *it;
// Read block from disk.
CBlock block;
if (!ReadBlockFromDisk(block, blockData.pindex, chainParams.GetConsensus())) {
LogPrintf(
"*** %s: Failed to read block %s while collecting shielded outputs from block connects",
__func__, blockData.pindex->GetBlockHash().GetHex());
uiInterface.ThreadSafeMessageBox(
strprintf(_("Error: A fatal internal error occurred, see %s for details"), GetDebugLogPath()),
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
return;
}
// Batch transactions that went from mempool to conflicted:
for (const CTransaction &tx : blockData.txConflicted) {
AddTxToBatches(
batchScanners,
tx,
blockData.pindex->GetBlockHash(),
blockData.pindex->nHeight + 1);
}
// ... and transactions that got confirmed:
for (const CTransaction &tx : block.vtx) {
AddTxToBatches(
batchScanners,
tx,
blockData.pindex->GetBlockHash(),
blockData.pindex->nHeight);
}
// Add block connections to batchScanners until we have either added
// the entirety of blockStack, or have reached the memory limit.
//
// We process blockStack in the same order we do below, so batched
// work can be completed in roughly the order we need it.
for (; blockStackScanned != blockStack.rend() && belowBatchMemoryLimit(); ++blockStackScanned) {
const auto& blockData = *blockStackScanned;
batchScanConnectedBlock(blockData);
}
// Batch transactions in the mempool.
// Batch transactions in the mempool. It's fine that we always add
// these to batchScanners; if we reached the memory limit earlier,
// it just means that the mempool results will be completed earlier
// than they are required, and will sit in memory for a bit longer.
// The ZIP 401 mempool limits are around 80MB, which is well below
// the limits we are concerned with here.
for (auto& tx : recentlyAdded.first) {
AddTxToBatches(batchScanners, tx, uint256(), pindexLastTip->nHeight + 1);
}
@ -406,45 +442,71 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// Notify block connections
while (!blockStack.empty()) {
auto& blockData = blockStack.back();
// Read block from disk.
CBlock block;
if (!ReadBlockFromDisk(block, blockData.pindex, chainParams.GetConsensus())) {
LogPrintf(
"*** %s: Failed to read block %s while notifying wallets of block connects",
__func__, blockData.pindex->GetBlockHash().GetHex());
uiInterface.ThreadSafeMessageBox(
strprintf(_("Error: A fatal internal error occurred, see %s for details"), GetDebugLogPath()),
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
return;
// Add more block connections to batchScanners until we have either
// added the rest of blockStack, or have reached the memory limit
// again. At this point, we know that blockStackScanned has not been
// invalidated by mutations to blockStack, and can be dereferenced.
for (; blockStackScanned != blockStack.rend() && belowBatchMemoryLimit(); ++blockStackScanned) {
const auto& blockData = *blockStackScanned;
batchScanConnectedBlock(blockData);
}
// Tell wallet about transactions that went from mempool
// to conflicted:
for (const CTransaction &tx : blockData.txConflicted) {
SyncWithWallets(batchScanners, tx, NULL, blockData.pindex->nHeight + 1);
}
// ... and about transactions that got confirmed:
for (const CTransaction &tx : block.vtx) {
SyncWithWallets(batchScanners, tx, &block, blockData.pindex->nHeight);
}
// Update cached incremental witnesses
// This will take the cs_main lock in order to obtain the CBlockLocator
// used by `SetBestChain`, but as that write only occurs once every
// WRITE_WITNESS_INTERVAL * 1000000 microseconds this should not be
// exploitable as a timing channel.
GetMainSignals().ChainTip(blockData.pindex, &block, blockData.oldTrees);
// Ensure that all pending work has been started.
FlushBatches(batchScanners);
// Notify UI to display prev block's coinbase if it was ours.
static uint256 hashPrevBestCoinBase;
GetMainSignals().UpdatedTransaction(hashPrevBestCoinBase);
hashPrevBestCoinBase = block.vtx[0].GetHash();
// Sync with the wallet until either:
// - there are no more blocks to sync, and we are done;
// - we have synced every block that batchScanners has been given,
// and can make no more forward progress; or
// - we have more blocks that we _could_ add to batchScanners, and
// are back below the memory limit (so should create more batches
// before continuing, for optimal pipelining).
while (!(
blockStack.empty() ||
blockStack.rbegin() == blockStackScanned ||
(blockStackScanned != blockStack.rend() && belowBatchMemoryLimit())
)) {
auto& blockData = blockStack.back();
// This block is done!
pindexLastTip = blockData.pindex;
blockStack.pop_back();
// Read block from disk.
CBlock block;
if (!ReadBlockFromDisk(block, blockData.pindex, chainParams.GetConsensus())) {
LogPrintf(
"*** %s: Failed to read block %s while notifying wallets of block connects",
__func__, blockData.pindex->GetBlockHash().GetHex());
uiInterface.ThreadSafeMessageBox(
strprintf(_("Error: A fatal internal error occurred, see %s for details"), GetDebugLogPath()),
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
return;
}
// Tell wallet about transactions that went from mempool
// to conflicted:
for (const CTransaction &tx : blockData.txConflicted) {
SyncWithWallets(batchScanners, tx, NULL, blockData.pindex->nHeight + 1);
}
// ... and about transactions that got confirmed:
for (const CTransaction &tx : block.vtx) {
SyncWithWallets(batchScanners, tx, &block, blockData.pindex->nHeight);
}
// Update cached incremental witnesses
// This will take the cs_main lock in order to obtain the CBlockLocator
// used by `SetBestChain`, but as that write only occurs once every
// WRITE_WITNESS_INTERVAL * 1000000 microseconds this should not be
// exploitable as a timing channel.
GetMainSignals().ChainTip(blockData.pindex, &block, blockData.oldTrees);
// Notify UI to display prev block's coinbase if it was ours.
static uint256 hashPrevBestCoinBase;
GetMainSignals().UpdatedTransaction(hashPrevBestCoinBase);
hashPrevBestCoinBase = block.vtx[0].GetHash();
// This block is done!
pindexLastTip = blockData.pindex;
assert(blockStack.rbegin() != blockStackScanned);
blockStack.pop_back();
}
}
// Notify transactions in the mempool