fix(ci): fix hangs in lightwalletd tests by checking concurrent process output in different threads (#4828)

* Make code execution time logs shorter

* Do ZK parameter preloads in the lightwalletd tests that need them

* Try to re-launch `lightwalletd` when it hangs during sync tests

* Increase full sync timeout

* Clear the `zebrad` logs during `lightwalletd` tests, to avoid logging deadlocks

* Actually clear more than one line of logs

* Check zebrad and lightwalletd output in parallel threads, while waiting for zebrad

* Check zebrad and lightwalletd output in parallel threads, while waiting for lightwalletd

* Improve test logging

* Fix a log typo

* Only wait for lightwalletd once, because its logs stop after the initial sync

* Look for cached state disks for this commit and branch first

* Only copy the state once in the send transactions test

* Wait longer for lightwalletd gRPC server startup

* Add some function docs

* cargo fmt --all
This commit is contained in:
teor 2022-07-29 07:06:18 +10:00 committed by GitHub
parent 404f682af6
commit 89a0410e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 151 additions and 147 deletions

View File

@ -256,7 +256,7 @@ jobs:
# - To ${{ inputs.zebra_state_dir || inputs.disk_prefix }} if not
#
# If there are multiple disks:
# - prefer images generated from the `main` branch, then any other branch
# - prefer images generated from this branch, then the `main` branch, then any other branch
# - prefer newer images to older images
#
# Passes the disk name to subsequent steps using $CACHED_DISK_NAME env variable
@ -273,20 +273,28 @@ jobs:
DISK_PREFIX=${{ inputs.zebra_state_dir || inputs.disk_prefix }}
fi
# Try to find an image generated from the main branch
# Try to find an image generated from this branch and commit
# Fields are listed in the "Create image from state disk" step
CACHED_DISK_NAME=$(gcloud compute images list --filter="name~${DISK_PREFIX}-main-[0-9a-f]+-v${LOCAL_STATE_VERSION}-${NETWORK}-${{ inputs.disk_suffix }}" --format="value(NAME)" --sort-by=~creationTimestamp --limit=1)
echo "main Disk: $CACHED_DISK_NAME"
BRANCH_DISK_NAME="${DISK_PREFIX}-${GITHUB_REF_SLUG_URL}-${GITHUB_SHA_SHORT}-v${LOCAL_STATE_VERSION}-${NETWORK}-${{ inputs.disk_suffix }}"
CACHED_DISK_NAME=$(gcloud compute images list --filter="name~${BRANCH_DISK_NAME}" --format="value(NAME)" --sort-by=~creationTimestamp --limit=1)
echo "${GITHUB_REF_SLUG_URL}-${GITHUB_SHA_SHORT} Disk: $CACHED_DISK_NAME"
if [[ -z "$CACHED_DISK_NAME" ]]; then
# Try to find an image generated from the main branch
CACHED_DISK_NAME=$(gcloud compute images list --filter="name~${DISK_PREFIX}-main-[0-9a-f]+-v${LOCAL_STATE_VERSION}-${NETWORK}-${{ inputs.disk_suffix }}" --format="value(NAME)" --sort-by=~creationTimestamp --limit=1)
echo "main Disk: $CACHED_DISK_NAME"
fi
if [[ -z "$CACHED_DISK_NAME" ]]; then
# Try to find an image generated from any other branch
CACHED_DISK_NAME=$(gcloud compute images list --filter="name~${DISK_PREFIX}-.+-[0-9a-f]+-v${LOCAL_STATE_VERSION}-${NETWORK}-${{ inputs.disk_suffix }}" --format="value(NAME)" --sort-by=~creationTimestamp --limit=1)
echo "Disk: $CACHED_DISK_NAME"
echo "any branch Disk: $CACHED_DISK_NAME"
fi
if [[ -z "$CACHED_DISK_NAME" ]]; then
echo "No cached state disk available"
echo "Expected ${DISK_PREFIX}-(branch)-[0-9a-f]+-v${LOCAL_STATE_VERSION}-${NETWORK}-${{ inputs.disk_suffix }}"
echo "Expected ${BRANCH_DISK_NAME}"
echo "Also searched for any commit on main, and any commit on any branch"
echo "Cached state test jobs must depend on the cached state rebuild job"
exit 1
fi

View File

@ -36,7 +36,11 @@ impl CodeTimer {
#[track_caller]
pub fn start() -> Self {
let start = Instant::now();
trace!(?start, "starting code timer");
trace!(
target: "run time",
?start,
"started code timer",
);
Self {
start,
@ -47,7 +51,6 @@ impl CodeTimer {
}
/// Finish timing the execution of a function, method, or other code region.
#[track_caller]
pub fn finish<S>(
mut self,
module_path: &'static str,
@ -62,7 +65,6 @@ impl CodeTimer {
/// Finish timing the execution of a function, method, or other code region.
///
/// This private method can be called from [`CodeTimer::finish()`] or `drop()`.
#[track_caller]
fn finish_inner<S>(
&mut self,
module_path: impl Into<Option<&'static str>>,
@ -78,42 +80,48 @@ impl CodeTimer {
self.has_finished = true;
let execution = self.start.elapsed();
let execution_time = duration_short(execution);
let time = duration_short(execution);
let time = time.as_str();
let module = module_path.into().unwrap_or_default();
let line = line.into().map(|line| line.to_string()).unwrap_or_default();
let line = line.as_str();
let module_path = module_path.into();
let line = line.into();
let description = description
.into()
.map(|desc| desc.to_string() + " ")
.map(|desc| desc.to_string())
.unwrap_or_default();
if execution >= self.min_warn_time {
warn!(
?execution_time,
?module_path,
?line,
"{description}code took a long time to execute",
target: "run time",
%time,
%module,
%line,
"very long {description}",
);
} else if execution >= self.min_info_time {
info!(
?execution_time,
?module_path,
?line,
"{description}code took longer than expected to execute",
target: "run time",
%time,
%module,
%line,
"long {description}",
);
} else {
trace!(
?execution_time,
?module_path,
?line,
"{description}code timer finished",
target: "run time",
%time,
%module,
%line,
"finished {description} code timer",
);
}
}
}
impl Drop for CodeTimer {
#[track_caller]
fn drop(&mut self) {
self.finish_inner(None, None, "(dropped, cancelled, or aborted)")
}

View File

@ -194,10 +194,10 @@ pub struct TestChild<T> {
/// The standard output stream of the child process.
///
/// TODO: replace with `Option<ChildOutput { stdout, stderr }>.
pub stdout: Option<Box<dyn IteratorDebug<Item = std::io::Result<String>>>>,
pub stdout: Option<Box<dyn IteratorDebug<Item = std::io::Result<String>> + Send>>,
/// The standard error stream of the child process.
pub stderr: Option<Box<dyn IteratorDebug<Item = std::io::Result<String>>>>,
pub stderr: Option<Box<dyn IteratorDebug<Item = std::io::Result<String>> + Send>>,
/// Command outputs which indicate test failure.
///
@ -434,9 +434,9 @@ impl<T> TestChild<T> {
fn map_into_string_lines<R>(
&self,
reader: R,
) -> Box<dyn IteratorDebug<Item = std::io::Result<String>>>
) -> Box<dyn IteratorDebug<Item = std::io::Result<String>> + Send>
where
R: Read + Debug + 'static,
R: Read + Debug + Send + 'static,
{
let failure_regexes = self.failure_regexes.clone();
let ignore_regexes = self.ignore_regexes.clone();

View File

@ -103,7 +103,7 @@
//!
//! Please refer to the documentation of each test for more information.
use std::{collections::HashSet, env, fs, path::PathBuf, time::Duration};
use std::{collections::HashSet, env, fs, panic, path::PathBuf, time::Duration};
use color_eyre::{
eyre::{eyre, Result, WrapErr},
@ -1350,7 +1350,7 @@ fn lightwalletd_integration_test(test_type: LightwalletdTestType) -> Result<()>
}
// Launch lightwalletd, if needed
let mut lightwalletd = if test_type.launches_lightwalletd() {
let lightwalletd = if test_type.launches_lightwalletd() {
// Wait until `zebrad` has opened the RPC endpoint
zebrad.expect_stdout_line_matches(regex::escape(
format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(),
@ -1432,60 +1432,64 @@ fn lightwalletd_integration_test(test_type: LightwalletdTestType) -> Result<()>
None
};
if test_type.needs_zebra_cached_state() {
// Wait for Zebra to sync its cached state to the chain tip
zebrad.expect_stdout_line_matches(SYNC_FINISHED_REGEX)?;
// Wait for lightwalletd to sync some blocks
if let Some(ref mut lightwalletd) = lightwalletd {
lightwalletd
.expect_stdout_line_matches("([Aa]dding block to cache)|([Ww]aiting for block)")?;
let (mut zebrad, lightwalletd) = if test_type.needs_zebra_cached_state() {
if let Some(mut lightwalletd) = lightwalletd {
// Wait for lightwalletd to sync to Zebra's tip.
//
// TODO: after the lightwalletd hangs are fixed, fail the test on errors or timeouts
if cfg!(lightwalletd_hang_fix) {
lightwalletd.expect_stdout_line_matches("[Ww]aiting for block")?;
} else {
// To work around a hang bug, we run the test until:
// - lightwalletd starts waiting for blocks (best case scenario)
// - lightwalletd syncs to near the tip (workaround, cached state image is usable)
// - the test times out with an error, but we ignore it
// (workaround, cached state might be usable, slow, or might fail other tests)
//
// TODO: update the regex to `1[8-9][0-9]{5}` when mainnet reaches block 1_800_000
let log_result = lightwalletd.expect_stdout_line_matches(
"([Aa]dding block to cache 1[7-9][0-9]{5})|([Ww]aiting for block)",
);
if log_result.is_err() {
// This error takes up about 100 lines, and looks like a panic message
tracing::warn!(
multi_line_error = ?log_result,
"ignoring a lightwalletd test failure, to work around a lightwalletd hang bug",
);
// "Adding block" and "Waiting for block" logs stop when `lightwalletd` reaches the tip.
// But if the logs just stop, we can't tell the difference between a hang and fully synced.
// So we assume `lightwalletd` will sync and log large groups of blocks,
// and check for logs with heights near the mainnet tip height.
//
// TODO: update the regex to `1[8-9][0-9]{5}` when mainnet reaches block 1_800_000
let lightwalletd_thread = std::thread::spawn(move || -> Result<_> {
tracing::info!(?test_type, "waiting for lightwalletd to sync to the tip");
lightwalletd.expect_stdout_line_matches(
"([Aa]dding block to cache 1[7-9][0-9]{5})|([Ww]aiting for block: 1[7-9][0-9]{5})",
)?;
Ok(lightwalletd)
});
// `lightwalletd` syncs can take a long time,
// so we need to check that `zebrad` has synced to the tip in parallel.
let lightwalletd_thread_and_zebrad = std::thread::spawn(move || -> Result<_> {
tracing::info!(?test_type, "waiting for zebrad to sync to the tip");
while !lightwalletd_thread.is_finished() {
zebrad.expect_stdout_line_matches(SYNC_FINISHED_REGEX)?;
}
}
}
// Check Zebra is still at the tip (also clears and prints Zebra's logs)
zebrad.expect_stdout_line_matches(SYNC_FINISHED_REGEX)?;
Ok((lightwalletd_thread, zebrad))
});
// lightwalletd doesn't log anything when we've reached the tip.
// But when it gets near the tip, it starts using the mempool.
//
// adityapk00/lightwalletd logs mempool changes, but zcash/lightwalletd doesn't.
//
// TODO: re-enable this code when lightwalletd hangs are fixed
if cfg!(lightwalletd_hang_fix) {
if let Some(ref mut lightwalletd) = lightwalletd {
lightwalletd.expect_stdout_line_matches(regex::escape(
"Block hash changed, clearing mempool clients",
))?;
lightwalletd
.expect_stdout_line_matches(regex::escape("Adding new mempool txid"))?;
}
// Retrieve the child process handles from the threads
let (lightwalletd_thread, zebrad) = lightwalletd_thread_and_zebrad
.join()
.unwrap_or_else(|panic_object| panic::resume_unwind(panic_object))?;
let lightwalletd = lightwalletd_thread
.join()
.unwrap_or_else(|panic_object| panic::resume_unwind(panic_object))?;
(zebrad, Some(lightwalletd))
} else {
// We're just syncing Zebra, so there's no lightwalletd to check
tracing::info!(?test_type, "waiting for zebrad to sync to the tip");
zebrad.expect_stdout_line_matches(SYNC_FINISHED_REGEX)?;
(zebrad, None)
}
}
} else {
// We don't have a cached state, so we don't do any tip checks for Zebra or lightwalletd
(zebrad, lightwalletd)
};
tracing::info!(
?test_type,
"cleaning up child processes and checking for errors",
);
// Cleanup both processes
//

View File

@ -44,26 +44,27 @@ pub const BETWEEN_NODES_DELAY: Duration = Duration::from_secs(2);
/// The amount of time we wait for lightwalletd to update to the tip.
///
/// The cached tip can be a few days old, and Zebra needs time to activate its mempool.
///
/// Currently, `zebrad` syncs are slower than `lightwalletd` syncs, so we re-use its timeout.
/// `lightwalletd` takes about 90 minutes to fully sync,
/// and `zebrad` takes about 30 minutes to update to the tip.
///
/// TODO: reduce to 20 minutes when `zebrad` sync performance improves
pub const LIGHTWALLETD_UPDATE_TIP_DELAY: Duration = LIGHTWALLETD_FULL_SYNC_TIP_DELAY;
pub const LIGHTWALLETD_UPDATE_TIP_DELAY: Duration = Duration::from_secs(60 * 60);
/// The amount of time we wait for lightwalletd to do a full sync to the tip.
///
/// `lightwalletd` takes about an hour to fully sync,
/// and Zebra needs time to activate its mempool.
pub const LIGHTWALLETD_FULL_SYNC_TIP_DELAY: Duration = Duration::from_secs(90 * 60);
/// See [`LIGHTWALLETD_UPDATE_TIP_DELAY`] for details.
pub const LIGHTWALLETD_FULL_SYNC_TIP_DELAY: Duration = Duration::from_secs(150 * 60);
/// The amount of extra time we wait for Zebra to sync to the tip,
/// after we ignore a lightwalletd failure.
///
/// Zebra logs a status entry every minute, so there should be at least 4 in this interval.
/// Since we restart `lightwalletd` after a hang, we allow time for another full `lightwalletd` sync.
///
/// See [`LIGHTWALLETD_UPDATE_TIP_DELAY`] for details.
///
/// TODO: remove this extra time when lightwalletd hangs are fixed
pub const ZEBRAD_EXTRA_DELAY_FOR_LIGHTWALLETD_WORKAROUND: Duration = Duration::from_secs(5 * 60);
pub const ZEBRAD_EXTRA_DELAY_FOR_LIGHTWALLETD_WORKAROUND: Duration =
LIGHTWALLETD_FULL_SYNC_TIP_DELAY;
/// Extension trait for methods on `tempfile::TempDir` for using it as a test
/// directory for `zebrad`.

View File

@ -311,17 +311,21 @@ impl LightwalletdTestType {
default_test_config()
};
if !self.needs_zebra_cached_state() {
return Some(config);
}
let zebra_state_path = self.zebrad_state_path(test_name)?;
let mut config = match config {
Ok(config) => config,
Err(error) => return Some(Err(error)),
};
// We want to preload the consensus parameters,
// except when we're doing the quick empty state test
config.consensus.debug_skip_parameter_preload = !self.needs_zebra_cached_state();
if !self.needs_zebra_cached_state() {
return Some(Ok(config));
}
let zebra_state_path = self.zebrad_state_path(test_name)?;
config.sync.checkpoint_verify_concurrency_limit =
zebrad::components::sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT;

View File

@ -1,10 +1,12 @@
//! Test sending transactions using a lightwalletd instance connected to a zebrad instance.
//!
//! This test requires a cached chain state that is partially synchronized, i.e., it should be a
//! few blocks below the network chain tip height.
//! few blocks below the network chain tip height. We open this state during the test, but we don't
//! add any blocks to it.
//!
//! The transactions to use to send are obtained from the blocks synchronized by a temporary zebrad
//! instance that are higher than the chain tip of the cached state.
//! instance that are higher than the chain tip of the cached state. This instance uses a copy of
//! the state.
//!
//! The zebrad instance connected to lightwalletd uses the cached state and does not connect to any
//! external peers, which prevents it from downloading the blocks from where the test transactions
@ -18,7 +20,6 @@ use std::{
use color_eyre::eyre::{eyre, Result};
use futures::TryFutureExt;
use tempfile::TempDir;
use tower::{Service, ServiceExt};
use zebra_chain::{
@ -28,10 +29,7 @@ use zebra_chain::{
use zebra_state::HashOrHeight;
use crate::common::{
cached_state::{
copy_state_directory, load_tip_height_from_state_directory,
start_state_service_with_cache_dir,
},
cached_state::{load_tip_height_from_state_directory, start_state_service_with_cache_dir},
launch::spawn_zebrad_for_rpc_without_initial_peers,
lightwalletd::{
wallet_grpc::{self, connect_to_lightwalletd, spawn_lightwalletd_with_rpc_server},
@ -55,9 +53,10 @@ pub async fn run() -> Result<()> {
let test_type = UpdateCachedState;
let zebrad_state_path = test_type.zebrad_state_path("send_transaction_tests".to_string());
if zebrad_state_path.is_none() {
return Ok(());
}
let zebrad_state_path = match zebrad_state_path {
Some(zebrad_state_path) => zebrad_state_path,
None => return Ok(()),
};
let lightwalletd_state_path =
test_type.lightwalletd_state_path("send_transaction_tests".to_string());
@ -75,17 +74,17 @@ pub async fn run() -> Result<()> {
"running gRPC send transaction test using lightwalletd & zebrad",
);
let (transactions, partial_sync_path) =
load_transactions_from_a_future_block(network, zebrad_state_path.unwrap()).await?;
let transactions =
load_transactions_from_a_future_block(network, zebrad_state_path.clone()).await?;
tracing::info!(
transaction_count = ?transactions.len(),
?partial_sync_path,
partial_sync_path = ?zebrad_state_path,
"got transactions to send",
);
let (_zebrad, zebra_rpc_address) =
spawn_zebrad_for_rpc_without_initial_peers(Network::Mainnet, partial_sync_path, test_type)?;
spawn_zebrad_for_rpc_without_initial_peers(Network::Mainnet, zebrad_state_path, test_type)?;
tracing::info!(
?zebra_rpc_address,
@ -129,37 +128,28 @@ pub async fn run() -> Result<()> {
/// Loads transactions from a block that's after the chain tip of the cached state.
///
/// This copies the cached state into a temporary directory when it is needed to avoid overwriting
/// anything. Two copies are made of the cached state.
/// We copy the cached state to avoid modifying `zebrad_state_path`.
/// This copy is used to launch a `zebrad` instance connected to the network,
/// which finishes synchronizing the chain.
/// Then we load transactions from this updated state.
///
/// The first copy is used by a zebrad instance connected to the network that finishes
/// synchronizing the chain. The transactions are loaded from this updated state.
///
/// The second copy of the state is returned together with the transactions. This means that the
/// returned tuple contains the temporary directory with the partially synchronized chain, and a
/// list of valid transactions that are not in any of the blocks present in that partially
/// synchronized chain.
/// Returns a list of valid transactions that are not in any of the blocks present in the
/// original `zebrad_state_path`.
async fn load_transactions_from_a_future_block(
network: Network,
zebrad_state_path: PathBuf,
) -> Result<(Vec<Arc<Transaction>>, TempDir)> {
tracing::info!(
?network,
?zebrad_state_path,
"preparing partial sync, copying files...",
);
let (partial_sync_path, partial_sync_height) =
prepare_partial_sync(network, zebrad_state_path).await?;
) -> Result<Vec<Arc<Transaction>>> {
let partial_sync_height =
load_tip_height_from_state_directory(network, zebrad_state_path.as_ref()).await?;
tracing::info!(
?partial_sync_height,
?partial_sync_path,
partial_sync_path = ?zebrad_state_path,
"performing full sync...",
);
let full_sync_path =
perform_full_sync_starting_from(network, partial_sync_path.as_ref()).await?;
perform_full_sync_starting_from(network, zebrad_state_path.as_ref()).await?;
tracing::info!(?full_sync_path, "loading transactions...");
@ -167,22 +157,7 @@ async fn load_transactions_from_a_future_block(
load_transactions_from_block_after(partial_sync_height, network, full_sync_path.as_ref())
.await?;
Ok((transactions, partial_sync_path))
}
/// Prepares the temporary directory of the partially synchronized chain.
///
/// Returns a temporary directory that can be used by a Zebra instance, as well as the chain tip
/// height of the partially synchronized chain.
async fn prepare_partial_sync(
network: Network,
zebrad_state_path: PathBuf,
) -> Result<(TempDir, block::Height)> {
let partial_sync_path = copy_state_directory(zebrad_state_path).await?;
let tip_height =
load_tip_height_from_state_directory(network, partial_sync_path.as_ref()).await?;
Ok((partial_sync_path, tip_height))
Ok(transactions)
}
/// Loads transactions from a block that's after the specified `height`.

View File

@ -16,7 +16,11 @@ tonic::include_proto!("cash.z.wallet.sdk.rpc");
pub type LightwalletdRpcClient =
compact_tx_streamer_client::CompactTxStreamerClient<tonic::transport::Channel>;
/// Start a lightwalletd instance with its RPC server functionality enabled.
/// Start a lightwalletd instance connected to `zebrad_rpc_address`,
/// using the `lightwalletd_state_path`, with its gRPC server functionality enabled.
///
/// Expects cached state based on the `test_type`.
/// Waits for `lightwalletd` to log "waiting for block" if `wait_for_blocks` is true.
///
/// Returns the lightwalletd instance and the port number that it is listening for RPC connections.
pub fn spawn_lightwalletd_with_rpc_server(

View File

@ -110,7 +110,7 @@ pub async fn run() -> Result<()> {
)?;
// Give lightwalletd a few seconds to open its grpc port before connecting to it
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
tracing::info!(
?lightwalletd_rpc_port,