fix(test): Fixes bugs in the lightwalletd integration tests (#9052)

* Fixes bug in send transaction test

* fixes new bug in send_transaction_test

* Removes unused `load_transactions_from_future_blocks` and factors out code for sending transactions to its own fn

* corrects tx count updates to exclude coinbase txs

* fixes formatting

* Calls zebra's sendrawtransaction method if lwd's send_transaction() return an error for more detailed error info

* removes instrument

* avoids panic when a future block has only a coinbase transaction

* fixes check for gossip log (only happens when 10 txs have been added

* fixes a concurrency bug, adds more detailed errors.

* removes unnecessary wait_for_stdout calls and fixes condition for early return

* Fixes issue around missing stdout line

* Fixes bug around expected tx ids and removes outdated TODO

* Fixes issue with expected ZF funding stream address balance in post-NU6 chains

* fixes the rest of wallet_grpc_test

* Update zebrad/src/components/mempool/downloads.rs

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
Arya 2024-12-11 09:52:59 -05:00 committed by GitHub
parent be50f7ce83
commit eb1d129fea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 197 additions and 156 deletions

View File

@ -16,15 +16,18 @@
//! were obtained. This is to ensure that zebra does not reject the transactions because they have
//! already been seen in a block.
use std::{cmp::min, sync::Arc, time::Duration};
use std::{cmp::min, collections::HashSet, sync::Arc};
use tower::BoxError;
use color_eyre::eyre::Result;
use color_eyre::eyre::{eyre, Result};
use zebra_chain::{
parameters::Network::{self, *},
block::Block,
parameters::Network::*,
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY;
use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY;
@ -34,10 +37,13 @@ use crate::common::{
lightwalletd::{
can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc,
sync::wait_for_zebrad_and_lightwalletd_sync,
wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude},
wallet_grpc::{
self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd,
Empty, Exclude,
},
},
sync::LARGE_CHECKPOINT_TIMEOUT,
test_type::TestType::{self, *},
regtest::MiningRpcMethods,
test_type::TestType::*,
};
/// The maximum number of transactions we want to send in the test.
@ -85,11 +91,19 @@ pub async fn run() -> Result<()> {
"running gRPC send transaction test using lightwalletd & zebrad",
);
let transactions =
load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?;
let mut count = 0;
let blocks: Vec<Block> =
get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.take_while(|block| {
count += block.transactions.len() - 1;
count <= max_sent_transactions()
})
.collect();
tracing::info!(
transaction_count = ?transactions.len(),
blocks_count = ?blocks.len(),
partial_sync_path = ?zebrad_state_path,
"got transactions to send, spawning isolated zebrad...",
);
@ -113,6 +127,8 @@ pub async fn run() -> Result<()> {
let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port");
let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address);
tracing::info!(
?test_type,
?zebra_rpc_address,
@ -134,7 +150,7 @@ pub async fn run() -> Result<()> {
"spawned lightwalletd connected to zebrad, waiting for them both to sync...",
);
let (_lightwalletd, mut zebrad) = wait_for_zebrad_and_lightwalletd_sync(
let (_lightwalletd, _zebrad) = wait_for_zebrad_and_lightwalletd_sync(
lightwalletd,
lightwalletd_rpc_port,
zebrad,
@ -164,6 +180,53 @@ pub async fn run() -> Result<()> {
.await?
.into_inner();
let mut transaction_hashes = HashSet::new();
let mut has_tx_with_shielded_elements = false;
let mut counter = 0;
for block in blocks {
let (has_shielded_elements, count) = send_transactions_from_block(
&mut rpc_client,
&zebrad_rpc_client,
block.clone(),
&mut transaction_hashes,
)
.await?;
has_tx_with_shielded_elements |= has_shielded_elements;
counter += count;
tracing::info!(
height = ?block.coinbase_height(),
"submitting block at height"
);
let submit_block_response = zebrad_rpc_client.submit_block(block).await;
tracing::info!(?submit_block_response, "submitted block");
}
// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements \
from future blocks in mempool via lightwalletd"
);
Ok(())
}
/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions
/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate.
///
/// Returns the zebrad test child that's handling the RPC requests.
#[tracing::instrument(skip_all)]
async fn send_transactions_from_block(
rpc_client: &mut CompactTxStreamerClient<tonic::transport::Channel>,
zebrad_rpc_client: &RpcRequestClient,
block: Block,
transaction_hashes: &mut HashSet<transaction::Hash>,
) -> Result<(bool, usize)> {
// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
@ -171,8 +234,17 @@ pub async fn run() -> Result<()> {
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));
let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();
let transactions: Vec<_> = block
.transactions
.iter()
.filter(|tx| !tx.is_coinbase())
.collect();
if transactions.is_empty() {
return Ok((false, 0));
}
transaction_hashes.extend(transactions.iter().map(|tx| tx.hash()));
tracing::info!(
transaction_count = ?transactions.len(),
@ -181,7 +253,7 @@ pub async fn run() -> Result<()> {
);
let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
for &transaction in &transactions {
let transaction_hash = transaction.hash();
// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
@ -195,20 +267,24 @@ pub async fn run() -> Result<()> {
tracing::info!(?transaction_hash, "sending transaction...");
let request = prepare_send_transaction_request(transaction);
let request = prepare_send_transaction_request(transaction.clone());
let response = rpc_client.send_transaction(request).await?.into_inner();
match rpc_client.send_transaction(request).await {
Ok(response) => assert_eq!(response.into_inner(), expected_response),
Err(err) => {
tracing::warn!(?err, "failed to send transaction");
let send_tx_rsp = zebrad_rpc_client
.send_transaction(transaction)
.await
.map_err(|e| eyre!(e));
assert_eq!(response, expected_response);
tracing::warn!(?send_tx_rsp, "failed to send tx twice");
}
};
}
// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
sleep_until_lwd_last_mempool_refresh.await;
tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
@ -217,25 +293,6 @@ pub async fn run() -> Result<()> {
.await?
.into_inner();
// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
tracing::info!("checking if lightwalletd has queried the mempool...");
// We need a short timeout here, because sometimes this message is not logged.
zebrad = zebrad.with_timeout(Duration::from_secs(60));
let tx_log =
zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds");
// Reset the failed timeout and give the rest of the test enough time to finish.
#[allow(unused_assignments)]
{
zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT);
}
if tx_log.is_err() {
tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks");
return Ok(());
}
tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
@ -251,16 +308,6 @@ pub async fn run() -> Result<()> {
counter += 1;
}
// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd"
);
// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();
@ -270,32 +317,7 @@ pub async fn run() -> Result<()> {
_counter += 1;
}
Ok(())
}
/// Loads transactions from a few block(s) after the chain tip of the cached state.
///
/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk
/// in the `ZEBRA_CACHED_STATE_DIR`.
///
/// ## Panics
///
/// If the provided `test_type` doesn't need an rpc server and cached state
#[tracing::instrument]
async fn load_transactions_from_future_blocks(
network: Network,
test_type: TestType,
test_name: &str,
) -> Result<Vec<Arc<Transaction>>> {
let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.flat_map(|block| block.transactions)
.filter(|transaction| !transaction.is_coinbase())
.take(max_sent_transactions())
.collect();
Ok(transactions)
Ok((has_tx_with_shielded_elements, counter))
}
/// Prepare a request to send to lightwalletd that contains a transaction to be sent.
@ -307,3 +329,21 @@ fn prepare_send_transaction_request(transaction: Arc<Transaction>) -> wallet_grp
height: 0,
}
}
trait SendTransactionMethod {
async fn send_transaction(
&self,
transaction: &Arc<Transaction>,
) -> Result<zebra_rpc::methods::SentTransactionHash, BoxError>;
}
impl SendTransactionMethod for RpcRequestClient {
async fn send_transaction(
&self,
transaction: &Arc<Transaction>,
) -> Result<zebra_rpc::methods::SentTransactionHash, BoxError> {
let tx_data = hex::encode(transaction.zcash_serialize_to_vec()?);
self.json_result_from_call("sendrawtransaction", format!(r#"["{tx_data}"]"#))
.await
}
}

View File

@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync<
wait_for_zebrad_mempool: bool,
wait_for_zebrad_tip: bool,
) -> Result<(TestChild<TempDir>, TestChild<P>)> {
let is_zebrad_finished = AtomicBool::new(false);
let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip);
let is_lightwalletd_finished = AtomicBool::new(false);
let is_zebrad_finished = &is_zebrad_finished;

View File

@ -37,11 +37,14 @@ use color_eyre::eyre::Result;
use hex_literal::hex;
use zebra_chain::{
block::Block,
parameters::Network,
parameters::NetworkUpgrade::{Nu5, Sapling},
block::{Block, Height},
parameters::{
Network,
NetworkUpgrade::{Nu5, Sapling},
},
serialization::ZcashDeserializeInto,
};
use zebra_consensus::funding_stream_address;
use zebra_state::state_database_format_version_in_code;
use crate::common::{
@ -291,60 +294,89 @@ pub async fn run() -> Result<()> {
// For the provided address in the first 10 blocks there are 10 transactions in the mainnet
assert_eq!(10, counter);
// Call `GetTaddressBalance` with the ZF funding stream address
let balance = rpc_client
.get_taddress_balance(AddressList {
addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()],
})
.await?
.into_inner();
let lwd_tip_height: Height = u32::try_from(block_tip.height)
.expect("should be below max block height")
.try_into()
.expect("should be below max block height");
// With ZFND or Major Grants funding stream address, the balance will always be greater than zero,
// because new coins are created in each block
assert!(balance.value_zat > 0);
let mut all_stream_addresses = Vec::new();
let mut all_balance_streams = Vec::new();
for &fs_receiver in network.funding_streams(lwd_tip_height).recipients().keys() {
let Some(fs_address) = funding_stream_address(lwd_tip_height, &network, fs_receiver) else {
// Skip if the lightwalletd tip height is above the funding stream end height.
continue;
};
// Call `GetTaddressBalanceStream` with the ZF funding stream address as a stream argument
let zf_stream_address = Address {
address: "t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string(),
};
tracing::info!(?fs_address, "getting balance for active fs address");
let balance_zf = rpc_client
.get_taddress_balance_stream(tokio_stream::iter(vec![zf_stream_address.clone()]))
.await?
.into_inner();
// Call `GetTaddressBalance` with the active funding stream address.
let balance = rpc_client
.get_taddress_balance(AddressList {
addresses: vec![fs_address.to_string()],
})
.await?
.into_inner();
// With ZFND funding stream address, the balance will always be greater than zero,
// because new coins are created in each block
assert!(balance_zf.value_zat > 0);
// Call `GetTaddressBalanceStream` with the active funding stream address as a stream argument.
let stream_address = Address {
address: fs_address.to_string(),
};
// Call `GetTaddressBalanceStream` with the MG funding stream address as a stream argument
let mg_stream_address = Address {
address: "t3XyYW8yBFRuMnfvm5KLGFbEVz25kckZXym".to_string(),
};
let balance_stream = rpc_client
.get_taddress_balance_stream(tokio_stream::iter(vec![stream_address.clone()]))
.await?
.into_inner();
let balance_mg = rpc_client
.get_taddress_balance_stream(tokio_stream::iter(vec![mg_stream_address.clone()]))
.await?
.into_inner();
// With any active funding stream address, the balance will always be greater than zero for blocks
// below the funding stream end height because new coins are created in each block.
assert!(balance.value_zat > 0);
assert!(balance_stream.value_zat > 0);
// With Major Grants funding stream address, the balance will always be greater than zero,
// because new coins are created in each block
assert!(balance_mg.value_zat > 0);
all_stream_addresses.push(stream_address);
all_balance_streams.push(balance_stream.value_zat);
// Call `GetTaddressBalanceStream` with both, the ZFND and the MG funding stream addresses as a stream argument
let balance_both = rpc_client
.get_taddress_balance_stream(tokio_stream::iter(vec![
zf_stream_address,
mg_stream_address,
]))
.await?
.into_inner();
// Call `GetAddressUtxos` with the active funding stream address that will always have utxos
let utxos = rpc_client
.get_address_utxos(GetAddressUtxosArg {
addresses: vec![fs_address.to_string()],
start_height: 1,
max_entries: 1,
})
.await?
.into_inner();
// The result is the sum of the values in both addresses
assert_eq!(
balance_both.value_zat,
balance_zf.value_zat + balance_mg.value_zat
);
// As we requested one entry we should get a response of length 1
assert_eq!(utxos.address_utxos.len(), 1);
// Call `GetAddressUtxosStream` with the active funding stream address that will always have utxos
let mut utxos_zf = rpc_client
.get_address_utxos_stream(GetAddressUtxosArg {
addresses: vec![fs_address.to_string()],
start_height: 1,
max_entries: 2,
})
.await?
.into_inner();
let mut counter = 0;
while let Some(_utxos) = utxos_zf.message().await? {
counter += 1;
}
// As we are in a "in sync" chain we know there are more than 2 utxos for this address (coinbase maturity rule)
// but we will receive the max of 2 from the stream response because we used a limit of 2 `max_entries`.
assert_eq!(2, counter);
}
if let Some(expected_total_balance) = all_balance_streams.into_iter().reduce(|a, b| a + b) {
// Call `GetTaddressBalanceStream` for all active funding stream addresses as a stream argument.
let total_balance = rpc_client
.get_taddress_balance_stream(tokio_stream::iter(all_stream_addresses))
.await?
.into_inner();
// The result should be the sum of the values in all active funding stream addresses.
assert_eq!(total_balance.value_zat, expected_total_balance);
}
let sapling_treestate_init_height = sapling_activation_height + 1;
@ -374,37 +406,6 @@ pub async fn run() -> Result<()> {
*zebra_test::vectors::SAPLING_TREESTATE_MAINNET_419201_STRING
);
// Call `GetAddressUtxos` with the ZF funding stream address that will always have utxos
let utxos = rpc_client
.get_address_utxos(GetAddressUtxosArg {
addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()],
start_height: 1,
max_entries: 1,
})
.await?
.into_inner();
// As we requested one entry we should get a response of length 1
assert_eq!(utxos.address_utxos.len(), 1);
// Call `GetAddressUtxosStream` with the ZF funding stream address that will always have utxos
let mut utxos_zf = rpc_client
.get_address_utxos_stream(GetAddressUtxosArg {
addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()],
start_height: 1,
max_entries: 2,
})
.await?
.into_inner();
let mut counter = 0;
while let Some(_utxos) = utxos_zf.message().await? {
counter += 1;
}
// As we are in a "in sync" chain we know there are more than 2 utxos for this address
// but we will receive the max of 2 from the stream response because we used a limit of 2 `max_entries`.
assert_eq!(2, counter);
// Call `GetLightdInfo`
let lightd_info = rpc_client.get_lightd_info(Empty {}).await?.into_inner();