2. fix(perf): When writing blocks to disk, don't block other async tasks (#4199)

* Only fetch block headers from the database to answer headers requests

* Move writing to the database to a blocking thread

* Add blocking threads to tests that need them

* Remove mempool downloader requirement for a populated state

And stop populating states that don't need it,
so we can use tokio::time::pause() in those tests.

* Improve debugging for an intermittent test failure

* Try to avoid a race condition populating the mempool in tests
This commit is contained in:
teor 2022-07-22 09:16:41 +10:00 committed by GitHub
parent cbb3232769
commit 394d16a5a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 138 additions and 133 deletions

View File

@ -134,7 +134,7 @@ static STATE_VERIFY_TRANSCRIPT_GENESIS: Lazy<
)]
});
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_checkpoint_test() -> Result<(), Report> {
verify_checkpoint(Config {
checkpoint_sync: true,
@ -204,7 +204,7 @@ async fn verify_fail_no_coinbase() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn round_trip_checkpoint_test() -> Result<(), Report> {
round_trip_checkpoint().await
}
@ -229,7 +229,7 @@ async fn round_trip_checkpoint() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_fail_add_block_checkpoint_test() -> Result<(), Report> {
verify_fail_add_block_checkpoint().await
}

View File

@ -29,7 +29,7 @@ use super::{
/// high system load.
const VERIFY_TIMEOUT_SECONDS: u64 = 10;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn single_item_checkpoint_list_test() -> Result<(), Report> {
single_item_checkpoint_list().await
}
@ -100,7 +100,7 @@ async fn single_item_checkpoint_list() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn multi_item_checkpoint_list_test() -> Result<(), Report> {
multi_item_checkpoint_list().await
}
@ -207,14 +207,14 @@ async fn multi_item_checkpoint_list() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn continuous_blockchain_no_restart() -> Result<(), Report> {
continuous_blockchain(None, Mainnet).await?;
continuous_blockchain(None, Testnet).await?;
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn continuous_blockchain_restart() -> Result<(), Report> {
for height in 0..zebra_test::vectors::CONTINUOUS_MAINNET_BLOCKS.len() {
continuous_blockchain(Some(block::Height(height.try_into().unwrap())), Mainnet).await?;
@ -424,7 +424,7 @@ async fn continuous_blockchain(
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn block_higher_than_max_checkpoint_fail_test() -> Result<(), Report> {
block_higher_than_max_checkpoint_fail().await
}
@ -494,7 +494,7 @@ async fn block_higher_than_max_checkpoint_fail() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn wrong_checkpoint_hash_fail_test() -> Result<(), Report> {
wrong_checkpoint_hash_fail().await
}
@ -662,7 +662,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn checkpoint_drop_cancel_test() -> Result<(), Report> {
checkpoint_drop_cancel().await
}
@ -762,7 +762,7 @@ async fn checkpoint_drop_cancel() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn hard_coded_mainnet_test() -> Result<(), Report> {
hard_coded_mainnet().await
}

View File

@ -16,7 +16,7 @@ use zebra_test::mock_service::MockService;
use super::super::*;
/// Snapshot test for RPC methods responses.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_rpc_response_data() {
zebra_test::init();

View File

@ -20,7 +20,7 @@ use zebra_test::mock_service::MockService;
use super::super::*;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getinfo() {
zebra_test::init();
@ -53,7 +53,7 @@ async fn rpc_getinfo() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getblock() {
zebra_test::init();
@ -113,7 +113,7 @@ async fn rpc_getblock() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getblock_parse_error() {
zebra_test::init();
@ -143,7 +143,7 @@ async fn rpc_getblock_parse_error() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getblock_missing_error() {
zebra_test::init();
@ -195,7 +195,7 @@ async fn rpc_getblock_missing_error() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getbestblockhash() {
zebra_test::init();
@ -241,7 +241,7 @@ async fn rpc_getbestblockhash() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getrawtransaction() {
zebra_test::init();
@ -326,7 +326,7 @@ async fn rpc_getrawtransaction() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getaddresstxids_invalid_arguments() {
zebra_test::init();
@ -430,7 +430,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getaddresstxids_response() {
zebra_test::init();
@ -525,7 +525,7 @@ async fn rpc_getaddresstxids_response_with(
);
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getaddressutxos_invalid_arguments() {
zebra_test::init();
@ -560,7 +560,7 @@ async fn rpc_getaddressutxos_invalid_arguments() {
state.expect_no_requests().await;
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getaddressutxos_response() {
zebra_test::init();

View File

@ -779,7 +779,19 @@ impl Service<Request> for StateService {
self.pending_utxos
.check_against_ordered(&prepared.new_outputs);
let rsp_rx = self.queue_and_commit_non_finalized(prepared);
// # Performance
//
// Allow other async tasks to make progress while blocks are being verified
// and written to disk. But wait for the blocks to finish committing,
// so that `StateService` multi-block queries always observe a consistent state.
//
// Since each block is spawned into its own task,
// there shouldn't be any other code running in the same task,
// so we don't need to worry about blocking it:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html#
let rsp_rx =
tokio::task::block_in_place(|| self.queue_and_commit_non_finalized(prepared));
async move {
rsp_rx
@ -804,7 +816,16 @@ impl Service<Request> for StateService {
);
self.pending_utxos.check_against(&finalized.new_outputs);
let rsp_rx = self.queue_and_commit_finalized(finalized);
// # Performance
//
// Allow other async tasks to make progress while blocks are being verified
// and written to disk. But wait for the blocks to finish committing,
// so that `StateService` multi-block queries always observe a consistent state.
//
// See the note in `CommitBlock` for more details.
let rsp_rx =
tokio::task::block_in_place(|| self.queue_and_commit_finalized(finalized));
async move {
rsp_rx

View File

@ -460,7 +460,6 @@ impl DiskDb {
/// Writes `batch` to the database.
pub fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
// TODO: move writing to the database to a blocking thread (#2188)
self.db.write(batch.batch)
}

View File

@ -29,7 +29,7 @@ async fn empty_read_state_still_responds_to_requests() -> Result<()> {
}
/// Test that ReadStateService responds correctly when the state contains blocks.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn populated_read_state_responds_correctly() -> Result<()> {
zebra_test::init();

View File

@ -52,12 +52,12 @@ static COMMIT_FINALIZED_BLOCK_TESTNET: Lazy<
]
});
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn check_transcripts_mainnet() -> Result<(), Report> {
check_transcripts(Network::Mainnet).await
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn check_transcripts_testnet() -> Result<(), Report> {
check_transcripts(Network::Testnet).await
}

View File

@ -49,7 +49,7 @@ use InventoryResponse::*;
/// Increasing this value causes the tests to take longer to complete, so it can't be too large.
const MAX_PEER_SET_REQUEST_DELAY: Duration = Duration::from_millis(500);
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_requests_for_transactions() {
let (
inbound_service,
@ -124,7 +124,7 @@ async fn mempool_requests_for_transactions() {
);
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
// get a block that has at least one non coinbase transaction
let block: Arc<Block> =
@ -132,6 +132,7 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
// use the first transaction that is not coinbase
let tx = block.transactions[1].clone();
let test_transaction_id = tx.unmined_id();
let (
inbound_service,
@ -163,24 +164,29 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
let (push_response, _) = futures::join!(request, verification);
assert_eq!(
push_response.expect("unexpected error response from inbound service"),
Response::Nil,
"`PushTransaction` requests should always respond `Ok(Nil)`",
);
// Wait for the mempool to store the transaction
tokio::time::sleep(Duration::from_millis(100)).await;
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
let mempool_response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
assert_eq!(
mempool_response.expect("unexpected error response from mempool"),
Response::TransactionIds(vec![test_transaction_id]),
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`",
);
// Make sure there is an additional request broadcasting the
// inserted transaction to peers.
@ -208,7 +214,7 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
// get a block that has at least one non coinbase transaction
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
@ -260,27 +266,29 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
Amount::zero(),
)));
});
let (response, _, _) = futures::join!(request, peer_set_responder, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"),
};
let (advertise_response, _, _) = futures::join!(request, peer_set_responder, verification);
assert_eq!(
advertise_response.expect("unexpected error response from inbound service"),
Response::Nil,
"`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`",
);
// Wait for the mempool to store the transaction
tokio::time::sleep(Duration::from_millis(100)).await;
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
let mempool_response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![test_transaction_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
assert_eq!(
mempool_response.expect("unexpected error response from mempool"),
Response::TransactionIds(vec![test_transaction_id]),
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`",
);
// Make sure there is an additional request broadcasting the
// inserted transaction to peers.
@ -308,7 +316,7 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
// Get a block that has at least one non coinbase transaction
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
@ -357,26 +365,29 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
let (push_response, _) = futures::join!(request, verification);
assert_eq!(
push_response.expect("unexpected error response from inbound service"),
Response::Nil,
"`PushTransaction` requests should always respond `Ok(Nil)`",
);
// Wait for the mempool to store the transaction
tokio::time::sleep(Duration::from_millis(100)).await;
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
let mempool_response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx1_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
assert_eq!(
mempool_response.expect("unexpected error response from mempool"),
Response::TransactionIds(vec![tx1_id]),
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`",
);
// Add a new block to the state (make the chain tip advance)
let block_two: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
@ -456,27 +467,31 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
Amount::zero(),
)));
});
let (response, _) = futures::join!(request, verification);
match response {
Ok(Response::Nil) => (),
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
};
let (push_response, _) = futures::join!(request, verification);
assert_eq!(
push_response.expect("unexpected error response from inbound service"),
Response::Nil,
"`PushTransaction` requests should always respond `Ok(Nil)`",
);
// Wait for the mempool to store the transaction
tokio::time::sleep(Duration::from_millis(100)).await;
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
let request = inbound_service
let mempool_response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
// Only tx2 will be in the mempool while tx1 was expired
match request {
Ok(Response::TransactionIds(response)) => {
assert_eq!(response, vec![tx2_id])
}
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
assert_eq!(
mempool_response.expect("unexpected error response from mempool"),
Response::TransactionIds(vec![tx2_id]),
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`",
);
// Check if tx1 was added to the rejected list as well
let response = mempool
.clone()
@ -585,7 +600,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
/// Test that the inbound downloader rejects blocks above the lookahead limit.
///
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
// Get services
let (

View File

@ -43,7 +43,10 @@ use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::transaction::{self, UnminedTxId, VerifiedUnminedTx};
use zebra_chain::{
block::Height,
transaction::{self, UnminedTxId, VerifiedUnminedTx},
};
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
@ -103,9 +106,6 @@ pub enum TransactionDownloadVerifyError {
#[error("error in state service")]
StateError(#[source] BoxError),
#[error("transaction not validated because the tip is empty")]
NoTip,
#[error("error downloading transaction")]
DownloadFailed(#[source] BoxError),
@ -273,13 +273,16 @@ where
// Don't download/verify if the transaction is already in the state.
Self::transaction_in_state(&mut state, txid).await?;
let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err(TransactionDownloadVerifyError::NoTip),
Ok(zs::Response::Tip(Some((height, _hash)))) => Ok(height),
let next_height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Ok(Height(0)),
Ok(zs::Response::Tip(Some((height, _hash)))) => {
let next_height =
(height + 1).expect("valid heights are far below the maximum");
Ok(next_height)
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
}?;
let height = (height + 1).expect("must have next height");
let tx = match gossiped_tx {
Gossip::Id(txid) => {
@ -322,7 +325,7 @@ where
let result = verifier
.oneshot(tx::Request::Mempool {
transaction: tx.clone(),
height,
height: next_height,
})
.map_ok(|rsp| {
rsp.into_mempool_transaction()

View File

@ -457,9 +457,6 @@ impl Storage {
TransactionDownloadVerifyError::InState |
// An unknown error in the state service, better do nothing
TransactionDownloadVerifyError::StateError(_) |
// Sync has just started. Mempool shouldn't even be enabled, so will not
// happen in practice.
TransactionDownloadVerifyError::NoTip |
// If download failed, do nothing; the crawler will end up trying to
// download it again.
TransactionDownloadVerifyError::DownloadFailed(_) |

View File

@ -375,7 +375,7 @@ async fn mempool_service_disabled() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_cancel_mined() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
@ -390,8 +390,6 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
time::pause();
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
@ -470,7 +468,7 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
@ -550,7 +548,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut mempool, _peer_set, mut state_service, mut tx_verifier, mut recent_syncs) =
let (mut mempool, _peer_set, _state_service, mut tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
// Get transactions to use in the test
@ -562,20 +560,6 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
// Push the genesis block to the state, since downloader needs a valid tip.
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Queue first transaction for verification
// (queue the transaction itself to avoid a download).
let request = mempool
@ -635,7 +619,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut mempool, mut peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
let (mut mempool, mut peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network, u64::MAX).await;
// Get transactions to use in the test
@ -647,20 +631,6 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
// Push the genesis block to the state, since downloader needs a valid tip.
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Queue second transaction for download and verification.
let request = mempool
.ready()