2. fix(perf): Run CPU-intensive state reads in parallel rayon threads (#4805)

* Split disk reads from CPU-heavy Sprout interstitial tree cryptography

* Improve anchor validation debugging and error messages

* Work around a test data bug, and save some CPU

* Remove redundant checks for empty shielded data

* Skip generating unused interstitial treestates

* Do disk fetches and quick checks, then CPU-heavy cryptography

* Wrap HistoryTree in an Arc in the state

* Run CPU-intensive chain validation and updates in parallel rayon threads

* Refactor to prepare for parallel tree root calculations

* Run finalized state note commitment tree root updates in parallel rayon threads

* Update finalized state note commitment trees using parallel rayon threads

* Fix a comment typo and add a TODO

* Split sprout treestate fetch into its own function

* Move parallel note commitment trees to zebra-chain

* Re-calculate the tree roots in the same parallel batches

* Do non-finalized note commitment tree updates in parallel threads

* Update comments about note commitment tree rebuilds

* Do post-fork tree updates in parallel threads

* Add a TODO for parallel tree updates in tests

* Fix broken intra-doc links

* Clarify documentation for sprout treestates

* Spawn large database reads into blocking tokio threads

* Concurrently read all blocks, headers, and transactions from disk

* Run zebra-state transaction deserialization on a rayon thread
This commit is contained in:
teor 2022-07-23 02:25:32 +10:00 committed by GitHub
parent 4257f60265
commit f81e997090
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 135 additions and 70 deletions

View File

@ -82,6 +82,7 @@ rand_chacha = { version = "0.3.1", optional = true }
tokio = { version = "1.20.0", features = ["tracing"], optional = true }
zebra-test = { path = "../zebra-test/", optional = true }
rayon = "1.5.3"
[dev-dependencies]

View File

@ -176,7 +176,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_fail_no_coinbase_test() -> Result<(), Report> {
verify_fail_no_coinbase().await
}

View File

@ -33,15 +33,12 @@ use tracing::instrument;
use tower::buffer::Buffer;
use zebra_chain::{
block::{self, Block},
block,
parameters::{Network, NetworkUpgrade},
transaction,
transaction::Transaction,
transparent,
};
use crate::{
request::HashOrHeight,
service::{
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::{FinalizedState, ZebraDb},
@ -471,24 +468,6 @@ impl StateService {
Some(tip.0 - height.0)
}
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), if it exists in the current best chain.
pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
read::block(self.mem.best_chain(), self.disk.db(), hash_or_height)
}
/// Returns the [`block::Header`] with [`Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), if it exists in the current best chain.
pub fn best_block_header(&self, hash_or_height: HashOrHeight) -> Option<Arc<block::Header>> {
read::block_header(self.mem.best_chain(), self.disk.db(), hash_or_height)
}
/// Returns the [`Transaction`] with [`transaction::Hash`],
/// if it exists in the current best chain.
pub fn best_transaction(&self, hash: transaction::Hash) -> Option<Arc<Transaction>> {
read::transaction(self.mem.best_chain(), self.disk.db(), hash).map(|(tx, _height)| tx)
}
/// Return the hash for the block at `height` in the current best chain.
pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
self.mem
@ -820,8 +799,7 @@ impl Service<Request> for StateService {
// # Performance
//
// Allow other async tasks to make progress while blocks are being verified
// and written to disk. But wait for the blocks to finish committing,
// so that `StateService` multi-block queries always observe a consistent state.
// and written to disk.
//
// See the note in `CommitBlock` for more details.
let rsp_rx =
@ -851,9 +829,11 @@ impl Service<Request> for StateService {
"type" => "depth",
);
let rsp = Ok(self.best_depth(hash)).map(Response::Depth);
let rsp = Ok(Response::Depth(self.best_depth(hash)));
async move { rsp }.boxed()
}
// TODO: consider spawning small reads into blocking tasks,
// because the database can do large cleanups during small reads.
Request::Tip => {
metrics::counter!(
"state.requests",
@ -862,7 +842,7 @@ impl Service<Request> for StateService {
"type" => "tip",
);
let rsp = Ok(self.best_tip()).map(Response::Tip);
let rsp = Ok(Response::Tip(self.best_tip()));
async move { rsp }.boxed()
}
Request::BlockLocator => {
@ -873,7 +853,9 @@ impl Service<Request> for StateService {
"type" => "block_locator",
);
let rsp = Ok(self.block_locator().unwrap_or_default()).map(Response::BlockLocator);
let rsp = Ok(Response::BlockLocator(
self.block_locator().unwrap_or_default(),
));
async move { rsp }.boxed()
}
Request::Transaction(hash) => {
@ -884,8 +866,20 @@ impl Service<Request> for StateService {
"type" => "transaction",
);
let rsp = Ok(self.best_transaction(hash)).map(Response::Transaction);
async move { rsp }.boxed()
// Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// # Performance
//
// Allow other async tasks to make progress while the transaction is being read from disk.
tokio::task::spawn_blocking(move || {
let rsp = read::transaction(best_chain, &db, hash);
Ok(Response::Transaction(rsp.map(|(tx, _height)| tx)))
})
.map(|join_result| join_result.expect("panic in Request::Transaction"))
.boxed()
}
Request::Block(hash_or_height) => {
metrics::counter!(
@ -895,8 +889,20 @@ impl Service<Request> for StateService {
"type" => "block",
);
let rsp = Ok(self.best_block(hash_or_height)).map(Response::Block);
async move { rsp }.boxed()
// Prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// # Performance
//
// Allow other async tasks to make progress while the block is being read from disk.
tokio::task::spawn_blocking(move || {
let rsp = read::block(best_chain, &db, hash_or_height);
Ok(Response::Block(rsp))
})
.map(|join_result| join_result.expect("panic in Request::Block"))
.boxed()
}
Request::AwaitUtxo(outpoint) => {
metrics::counter!(
@ -935,6 +941,8 @@ impl Service<Request> for StateService {
"type" => "find_block_headers",
);
// Before we spawn the future, get a consistent set of chain hashes from the state.
const MAX_FIND_BLOCK_HEADERS_RESULTS: usize = 160;
// Zcashd will blindly request more block headers as long as it
// got 160 block headers in response to a previous query, EVEN
@ -944,16 +952,30 @@ impl Service<Request> for StateService {
// https://github.com/bitcoin/bitcoin/pull/4468/files#r17026905
let count = MAX_FIND_BLOCK_HEADERS_RESULTS - 2;
let res = self.find_best_chain_hashes(known_blocks, stop, count);
let res: Vec<_> = res
.iter()
.map(|&hash| {
let header = self
.best_block_header(hash.into())
.expect("block header for found hash is in the best chain");
block::CountedHeader { header }
})
.collect();
async move { Ok(Response::BlockHeaders(res)) }.boxed()
// And prepare data for concurrent execution
let best_chain = self.mem.best_chain().cloned();
let db = self.disk.db().clone();
// # Performance
//
// Now we have the chain hashes, we can read the headers concurrently,
// which allows other async tasks to make progress while data is being read from disk.
tokio::task::spawn_blocking(move || {
let res = res
.iter()
.map(|&hash| {
let header = read::block_header(best_chain.clone(), &db, hash.into())
.expect("block header for found hash is in the best chain");
block::CountedHeader { header }
})
.collect();
Ok(Response::BlockHeaders(res))
})
.map(|join_result| join_result.expect("panic in Request::FindBlockHeaders"))
.boxed()
}
}
}
@ -983,13 +1005,17 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading blocks from disk.
tokio::task::spawn_blocking(move || {
let block = state.best_chain_receiver.with_watch_data(|best_chain| {
read::block(best_chain, &state.db, hash_or_height)
});
Ok(ReadResponse::Block(block))
}
})
.map(|join_result| join_result.expect("panic in ReadRequest::Block"))
.boxed()
}
@ -1004,14 +1030,18 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading transactions from disk.
tokio::task::spawn_blocking(move || {
let transaction_and_height =
state.best_chain_receiver.with_watch_data(|best_chain| {
read::transaction(best_chain, &state.db, hash)
});
Ok(ReadResponse::Transaction(transaction_and_height))
}
})
.map(|join_result| join_result.expect("panic in ReadRequest::Transaction"))
.boxed()
}
@ -1025,13 +1055,17 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading trees from disk.
tokio::task::spawn_blocking(move || {
let sapling_tree = state.best_chain_receiver.with_watch_data(|best_chain| {
read::sapling_tree(best_chain, &state.db, hash_or_height)
});
Ok(ReadResponse::SaplingTree(sapling_tree))
}
})
.map(|join_result| join_result.expect("panic in ReadRequest::SaplingTree"))
.boxed()
}
@ -1045,13 +1079,17 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading trees from disk.
tokio::task::spawn_blocking(move || {
let orchard_tree = state.best_chain_receiver.with_watch_data(|best_chain| {
read::orchard_tree(best_chain, &state.db, hash_or_height)
});
Ok(ReadResponse::OrchardTree(orchard_tree))
}
})
.map(|join_result| join_result.expect("panic in ReadRequest::OrchardTree"))
.boxed()
}
@ -1069,13 +1107,19 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading transaction IDs from disk.
tokio::task::spawn_blocking(move || {
let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_tx_ids(best_chain, &state.db, addresses, height_range)
});
tx_ids.map(ReadResponse::AddressesTransactionIds)
}
})
.map(|join_result| {
join_result.expect("panic in ReadRequest::TransactionIdsByAddresses")
})
.boxed()
}
@ -1090,13 +1134,17 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading balances from disk.
tokio::task::spawn_blocking(move || {
let balance = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_balance(best_chain, &state.db, addresses)
})?;
Ok(ReadResponse::AddressBalance(balance))
}
})
.map(|join_result| join_result.expect("panic in ReadRequest::AddressBalance"))
.boxed()
}
@ -1111,13 +1159,17 @@ impl Service<ReadRequest> for ReadStateService {
let state = self.clone();
async move {
// # Performance
//
// Allow other async tasks to make progress while concurrently reading UTXOs from disk.
tokio::task::spawn_blocking(move || {
let utxos = state.best_chain_receiver.with_watch_data(|best_chain| {
read::transparent_utxos(state.network, best_chain, &state.db, addresses)
});
utxos.map(ReadResponse::Utxos)
}
})
.map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses"))
.boxed()
}
}

View File

@ -229,10 +229,23 @@ impl IntoDisk for Transaction {
impl FromDisk for Transaction {
fn from_bytes(bytes: impl AsRef<[u8]>) -> Self {
bytes
.as_ref()
.zcash_deserialize_into()
.expect("deserialization format should match the serialization format used by IntoDisk")
let bytes = bytes.as_ref();
let mut tx = None;
// # Performance
//
// Move CPU-intensive deserialization cryptography into the rayon thread pool.
// This avoids blocking the tokio executor.
rayon::in_place_scope_fifo(|scope| {
scope.spawn_fifo(|_scope| {
tx = Some(bytes.as_ref().zcash_deserialize_into().expect(
"deserialization format should match the serialization format used by IntoDisk",
));
});
});
tx.expect("scope has already run")
}
}

View File

@ -111,8 +111,11 @@ impl ZebraDb {
// Manually fetch the entire block's transactions
let mut transactions = Vec::new();
// TODO: is this loop more efficient if we store the number of transactions?
// is the difference large enough to matter?
// TODO:
// - split disk reads from deserialization, and run deserialization in parallel,
// this improves performance for blocks with multiple large shielded transactions
// - is this loop more efficient if we store the number of transactions?
// - is the difference large enough to matter?
for tx_index in 0..=Transaction::max_allocation() {
let tx_loc = TransactionLocation::from_u64(height, tx_index);

View File

@ -208,7 +208,7 @@ fn out_of_order_committing_strategy() -> BoxedStrategy<Vec<Arc<Block>>> {
Just(blocks).prop_shuffle().boxed()
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn empty_state_still_responds_to_requests() -> Result<()> {
zebra_test::init();

View File

@ -120,7 +120,7 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> {
/// Check that a network stack with an empty state responds to block requests with `notfound`.
///
/// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> {
let (
// real services

View File

@ -543,7 +543,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
}
/// Check if a transaction that fails verification is rejected by the mempool.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
@ -555,8 +555,6 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
let rejected_tx = unmined_transactions.next().unwrap().clone();
time::pause();
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
@ -614,7 +612,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
}
/// Check if a transaction that fails download is _not_ rejected.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
@ -626,8 +624,6 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
let rejected_valid_tx = unmined_transactions.next().unwrap().clone();
time::pause();
// Enable the mempool
mempool.enable(&mut recent_syncs).await;