Merge pull request #6368 from nuttycom/bug/slow_wallet_shutdown

Fetch recently conflicted transactions incrementally in ThreadNotifyWallet
This commit is contained in:
Daira Hopwood 2023-01-27 21:23:47 +00:00 committed by GitHub
commit 88a57f3617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 99 additions and 75 deletions

View File

@ -42,6 +42,18 @@ heights prior to this will not have any information recorded. To track changes
from genesis, and thus monitor the total transparent pool size and chain supply,
you would need to restart your node with the `-reindex` option.
Wallet Performance Fixes
------------------------
The 100MiB memory limit for the batch scanner has been replaced by a 1000-block
limit. This eliminates an expensive call to determine the current memory usage
of the batch scanner.
The following associated metric has been removed from the set of metrics
reported when `-prometheusport` is set:
- (gauge) `zcashd.wallet.batchscanner.usage.bytes`
RPC Changes
-----------

View File

@ -4062,8 +4062,8 @@ static int64_t nTimeChainState = 0;
static int64_t nTimePostConnect = 0;
// Protected by cs_main
std::map<CBlockIndex*, std::list<CTransaction>> recentlyConflictedTxs;
uint64_t nRecentlyConflictedSequence = 0;
std::map<const CBlockIndex*, std::list<CTransaction>> recentlyConflictedTxs;
uint64_t nConnectedSequence = 0;
uint64_t nNotifiedSequence = 0;
/**
@ -4115,7 +4115,9 @@ bool static ConnectTip(CValidationState& state, const CChainParams& chainparams,
// Cache the conflicted transactions for subsequent notification.
// Updates to connected wallets are triggered by ThreadNotifyWallets
recentlyConflictedTxs.insert(std::make_pair(pindexNew, txConflicted));
nRecentlyConflictedSequence += 1;
// Increment the count of `ConnectTip` calls.
nConnectedSequence += 1;
EnforceNodeDeprecation(pindexNew->nHeight);
@ -4126,29 +4128,40 @@ bool static ConnectTip(CValidationState& state, const CChainParams& chainparams,
return true;
}
std::pair<std::map<CBlockIndex*, std::list<CTransaction>>, uint64_t> DrainRecentlyConflicted()
std::pair<std::list<CTransaction>, std::optional<uint64_t>> TakeRecentlyConflicted(const CBlockIndex* pindex)
{
uint64_t recentlyConflictedSequence;
std::map<CBlockIndex*, std::list<CTransaction>> txs;
{
LOCK(cs_main);
recentlyConflictedSequence = nRecentlyConflictedSequence;
txs.swap(recentlyConflictedTxs);
}
AssertLockHeld(cs_main);
return std::make_pair(txs, recentlyConflictedSequence);
// We use bracket notation for retrieving conflict data from recentlyConflictedTxs
// here because when a node restarts, the wallet may be behind the node's view of
// the current chain tip. The node may continue reindexing from the chain tip, but
// no entries will exist in `recentlyConflictedTxs` until the next block after the
// node's chain tip at the point of shutdown. In these cases, the wallet cannot learn
// about conflicts in those blocks (which should be fine).
std::list<CTransaction> conflictedTxs = recentlyConflictedTxs[pindex];
recentlyConflictedTxs.erase(pindex);
if (recentlyConflictedTxs.empty()) {
return std::make_pair(conflictedTxs, nConnectedSequence);
} else {
return std::make_pair(conflictedTxs, std::nullopt);
}
}
void SetChainNotifiedSequence(const CChainParams& chainparams, uint64_t recentlyConflictedSequence) {
uint64_t GetChainConnectedSequence() {
LOCK(cs_main);
return nConnectedSequence;
}
void SetChainNotifiedSequence(const CChainParams& chainparams, uint64_t connectedSequence) {
assert(chainparams.NetworkIDString() == "regtest");
LOCK(cs_main);
nNotifiedSequence = recentlyConflictedSequence;
nNotifiedSequence = connectedSequence;
}
bool ChainIsFullyNotified(const CChainParams& chainparams) {
assert(chainparams.NetworkIDString() == "regtest");
LOCK(cs_main);
return nRecentlyConflictedSequence == nNotifiedSequence;
return nConnectedSequence == nNotifiedSequence;
}
/**

View File

@ -668,7 +668,8 @@ CMutableTransaction CreateNewContextualCMutableTransaction(
int nHeight,
bool requireV4);
std::pair<std::map<CBlockIndex*, std::list<CTransaction>>, uint64_t> DrainRecentlyConflicted();
std::pair<std::list<CTransaction>, std::optional<uint64_t>> TakeRecentlyConflicted(const CBlockIndex* pindex);
uint64_t GetChainConnectedSequence();
void SetChainNotifiedSequence(const CChainParams& chainparams, uint64_t recentlyConflictedSequence);
bool ChainIsFullyNotified(const CChainParams& chainparams);

View File

@ -55,7 +55,6 @@ mod ffi {
network: &Network,
sapling_ivks: &[[u8; 32]],
) -> Result<Box<BatchScanner>>;
fn get_dynamic_usage(self: &BatchScanner) -> usize;
fn add_transaction(
self: &mut BatchScanner,
block_tag: [u8; 32],
@ -82,7 +81,6 @@ const METRIC_OUTPUTS_SCANNED: &str = "zcashd.wallet.batchscanner.outputs.scanned
const METRIC_LABEL_KIND: &str = "kind";
const METRIC_SIZE_TXS: &str = "zcashd.wallet.batchscanner.size.transactions";
const METRIC_USAGE_BYTES: &str = "zcashd.wallet.batchscanner.usage.bytes";
/// Chain parameters for the networks supported by `zcashd`.
#[derive(Clone, Copy)]
@ -787,16 +785,6 @@ fn init_batch_scanner(
}
impl BatchScanner {
/// FFI helper to access the `DynamicUsage` trait.
fn get_dynamic_usage(&self) -> usize {
let usage = self.dynamic_usage();
// Since we've measured it, we may as well update the metric.
metrics::gauge!(METRIC_USAGE_BYTES, usage as f64);
usage
}
/// Adds the given transaction's shielded outputs to the various batch runners.
///
/// `block_tag` is the hash of the block that triggered this txid being added to the
@ -833,7 +821,6 @@ impl BatchScanner {
// Update the size of the batch scanner.
metrics::increment_gauge!(METRIC_SIZE_TXS, 1.0);
metrics::gauge!(METRIC_USAGE_BYTES, self.dynamic_usage() as f64);
Ok(())
}
@ -866,7 +853,6 @@ impl BatchScanner {
// Update the size of the batch scanner.
metrics::decrement_gauge!(METRIC_SIZE_TXS, 1.0);
metrics::gauge!(METRIC_USAGE_BYTES, self.dynamic_usage() as f64);
Box::new(BatchResult { sapling })
}

View File

@ -72,16 +72,6 @@ void UnregisterAllValidationInterfaces() {
g_signals.UpdatedBlockTip.disconnect_all_slots();
}
size_t RecursiveDynamicUsage(
std::vector<BatchScanner*> &batchScanners)
{
size_t usage = 0;
for (auto& batchScanner : batchScanners) {
usage += batchScanner->RecursiveDynamicUsage();
}
return usage;
}
void AddTxToBatches(
std::vector<BatchScanner*> &batchScanners,
const CTransaction &tx,
@ -128,8 +118,6 @@ struct CachedBlockData {
void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
{
size_t nBatchScannerMemLimit = DEFAULT_BATCHSCANNERMEMLIMIT * 1024 * 1024;
// If pindexLastTip == nullptr, the wallet is at genesis.
// However, the genesis block is not loaded synchronously.
// We need to wait for ThreadImport to finish.
@ -163,8 +151,9 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// The stack of blocks we will notify as having been connected.
// Pushed in reverse, popped in order.
std::vector<CachedBlockData> blockStack;
// Transactions that have been recently conflicted out of the mempool.
std::pair<std::map<CBlockIndex*, std::list<CTransaction>>, uint64_t> recentlyConflicted;
// Sequence number indicating that we have notified wallets of transactions up to
// the ConnectBlock() call that generated this sequence number.
std::optional<uint64_t> chainNotifiedSequence;
// Transactions that have been recently added to the mempool.
std::pair<std::vector<CTransaction>, uint64_t> recentlyAdded;
@ -176,12 +165,14 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
CBlockIndex *pindex = chainActive.Tip();
pindexFork = chainActive.FindFork(pindexLastTip);
// Fetch recently-conflicted transactions. These will include any
// block that has been connected since the last cycle, but we only
// notify for the conflicts created by the current active chain.
recentlyConflicted = DrainRecentlyConflicted();
// Iterate backwards over the connected blocks until we have at
// most WALLET_NOTIFY_MAX_BLOCKS to process.
while (pindex && pindex->nHeight > pindexFork->nHeight + WALLET_NOTIFY_MAX_BLOCKS) {
pindex = pindex->pprev;
}
// Iterate backwards over the connected blocks we need to notify.
bool originalTipAtFork = pindex && pindex == pindexFork;
while (pindex && pindex != pindexFork) {
MerkleFrontiers oldFrontiers;
// Get the Sprout commitment tree as of the start of this block.
@ -214,15 +205,38 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
OrchardMerkleFrontier::empty_root(), oldFrontiers.orchard));
}
// Fetch recently-conflicted transactions. These will include any
// block that has been connected since the last cycle, but we only
// notify for the conflicts created by the current active chain.
auto recentlyConflicted = TakeRecentlyConflicted(pindex);
blockStack.emplace_back(
pindex,
oldFrontiers,
recentlyConflicted.first[pindex]);
recentlyConflicted.first);
chainNotifiedSequence = recentlyConflicted.second;
pindex = pindex->pprev;
}
recentlyAdded = mempool.DrainRecentlyAdded();
// This conditional can be true in the case that in the interval
// since the last second-boundary, two reorgs occurred: one that
// shifted over to a different chain history, and then a second
// that returned the chain to the original pre-reorg tip. This
// should never occur unless a caller has manually used
// `invalidateblock` to force the second reorg or we have a long
// persistent set of dueling chains. In such a case, wallets may
// not be fully notified of conflicted transactions, but they will
// still have a correct view of the current main chain, and they
// will still be notified properly of the current state of
// transactions in the mempool.
if (originalTipAtFork) {
chainNotifiedSequence = GetChainConnectedSequence();
}
if (chainNotifiedSequence.has_value()) {
recentlyAdded = mempool.DrainRecentlyAdded();
}
}
//
@ -307,12 +321,6 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// for example to add new incoming viewing keys.
auto batchScanners = GetMainSignals().GetBatchScanner();
// Closure that returns true if batchScanners is using less memory than
// the desired limit.
auto belowBatchMemoryLimit = [&]() {
return RecursiveDynamicUsage(batchScanners) < nBatchScannerMemLimit;
};
// Closure that will add a block from blockStack to batchScanners.
auto batchScanConnectedBlock = [&](const CachedBlockData& blockData) {
// Read block from disk.
@ -403,7 +411,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
//
// We process blockStack in the same order we do below, so batched
// work can be completed in roughly the order we need it.
for (; blockStackScanned != blockStack.rend() && belowBatchMemoryLimit(); ++blockStackScanned) {
for (; blockStackScanned != blockStack.rend(); ++blockStackScanned) {
const auto& blockData = *blockStackScanned;
batchScanConnectedBlock(blockData);
}
@ -460,7 +468,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// added the rest of blockStack, or have reached the memory limit
// again. At this point, we know that blockStackScanned has not been
// invalidated by mutations to blockStack, and can be dereferenced.
for (; blockStackScanned != blockStack.rend() && belowBatchMemoryLimit(); ++blockStackScanned) {
for (; blockStackScanned != blockStack.rend(); ++blockStackScanned) {
const auto& blockData = *blockStackScanned;
batchScanConnectedBlock(blockData);
}
@ -478,7 +486,7 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
while (!(
blockStack.empty() ||
blockStack.rbegin() == blockStackScanned ||
(blockStackScanned != blockStack.rend() && belowBatchMemoryLimit())
(blockStackScanned != blockStack.rend())
)) {
auto& blockData = blockStack.back();
@ -540,8 +548,12 @@ void ThreadNotifyWallets(CBlockIndex *pindexLastTip)
// Update the notified sequence numbers. We only need this in regtest mode,
// and should not lock on cs or cs_main here otherwise.
if (chainParams.NetworkIDString() == "regtest") {
SetChainNotifiedSequence(chainParams, recentlyConflicted.second);
mempool.SetNotifiedSequence(recentlyAdded.second);
if (chainNotifiedSequence.has_value()) {
SetChainNotifiedSequence(chainParams, chainNotifiedSequence.value());
}
if (recentlyAdded.second > 0) {
mempool.SetNotifiedSequence(recentlyAdded.second);
}
}
}
}

View File

@ -15,8 +15,11 @@
#include "miner.h"
#include "zcash/IncrementalMerkleTree.hpp"
/** Default limit on batch scanner memory usage in MiB. */
static const size_t DEFAULT_BATCHSCANNERMEMLIMIT = 100;
/**
* Limit on the maximum number of blocks that will be staged for
* scanning before an interrupt will be handled.
*/
static const size_t WALLET_NOTIFY_MAX_BLOCKS = 1000;
class CBlock;
class CBlockIndex;
@ -29,11 +32,6 @@ class uint256;
class BatchScanner {
public:
/**
* Returns the current dynamic memory usage of this batch scanner.
*/
virtual size_t RecursiveDynamicUsage() = 0;
/**
* Adds a transaction to the batch scanner.
*

View File

@ -1455,6 +1455,15 @@ void CWallet::ChainTip(const CBlockIndex *pindex,
DecrementNoteWitnesses(consensus, pindex);
UpdateSaplingNullifierNoteMapForBlock(pblock);
}
auto hash = tfm::format("%s", pindex->GetBlockHash().ToString());
auto height = tfm::format("%d", pindex->nHeight);
auto kind = tfm::format("%s", added.has_value() ? "connect" : "disconnect");
TracingInfo("wallet", "CWallet::ChainTip: processed block",
"hash", hash.c_str(),
"height", height.c_str(),
"kind", kind.c_str());
}
void CWallet::RunSaplingMigration(int blockHeight) {
@ -3597,11 +3606,6 @@ bool WalletBatchScanner::AddToWalletIfInvolvingMe(
// BatchScanner APIs
//
size_t WalletBatchScanner::RecursiveDynamicUsage()
{
return inner->get_dynamic_usage();
}
void WalletBatchScanner::AddTransaction(
const CTransaction &tx,
const std::vector<unsigned char> &txBytes,

View File

@ -1148,8 +1148,6 @@ public:
// BatchScanner APIs
//
size_t RecursiveDynamicUsage();
void AddTransaction(
const CTransaction &tx,
const std::vector<unsigned char> &txBytes,