diff --git a/.github/workflows/ci-integration-tests-gcp.patch-external.yml b/.github/workflows/ci-integration-tests-gcp.patch-external.yml index 7590b3b42..30a82a83a 100644 --- a/.github/workflows/ci-integration-tests-gcp.patch-external.yml +++ b/.github/workflows/ci-integration-tests-gcp.patch-external.yml @@ -86,6 +86,13 @@ jobs: steps: - run: 'echo "No build required"' + scan-task-commands: + name: scan task commands / Run scan-task-commands test + needs: get-available-disks + runs-on: ubuntu-latest + steps: + - run: 'echo "No build required"' + lightwalletd-full-sync: name: lightwalletd tip / Run lwd-full-sync test needs: get-available-disks diff --git a/.github/workflows/ci-integration-tests-gcp.yml b/.github/workflows/ci-integration-tests-gcp.yml index b411d0c75..895440547 100644 --- a/.github/workflows/ci-integration-tests-gcp.yml +++ b/.github/workflows/ci-integration-tests-gcp.yml @@ -631,6 +631,32 @@ jobs: zebra_state_dir: "zebrad-cache" secrets: inherit + # Test that the scan task registers keys, deletes keys, and subscribes to results for keys while running. + # + # Runs: + # - after every PR is merged to `main` + # - on every PR update + # + # If the state version has changed, waits for the new cached states to be created. + # Otherwise, if the state rebuild was skipped, runs immediately after the build job. + scan-task-commands-test: + name: scan task commands + needs: [test-full-sync, get-available-disks] + uses: ./.github/workflows/sub-deploy-integration-tests-gcp.yml + if: ${{ !cancelled() && !failure() && (fromJSON(needs.get-available-disks.outputs.zebra_tip_disk) || needs.test-full-sync.result == 'success') && github.event.inputs.regenerate-disks != 'true' && github.event.inputs.run-full-sync != 'true' && github.event.inputs.run-lwd-sync != 'true' }} + with: + app_name: zebrad + test_id: scan-task-commands + test_description: Test that the scan task registers keys, deletes keys, and subscribes to results for keys while running. + test_variables: "-e NETWORK=${{ inputs.network || vars.ZCASH_NETWORK }} -e TEST_SCAN_TASK_COMMANDS=1 -e ZEBRA_FORCE_USE_COLOR=1 -e ZEBRA_CACHED_STATE_DIR=/var/cache/zebrad-cache" + needs_zebra_state: true + needs_lwd_state: false + saves_to_disk: false + disk_suffix: tip + root_state_path: "/var/cache" + zebra_state_dir: "zebrad-cache" + secrets: inherit + failure-issue: name: Open or update issues for main branch failures # When a new test is added to this workflow, add it to this list. @@ -652,6 +678,7 @@ jobs: get-block-template-test, submit-block-test, scan-start-where-left-test, + scan-task-commands-test ] # Only open tickets for failed scheduled jobs, manual workflow runs, or `main` branch merges. # (PR statuses are already reported in the PR jobs list, and checked by Mergify.) diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index ac7ffbff4..8cd61a49b 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -78,6 +78,7 @@ fi : "${TEST_SUBMIT_BLOCK:=}" : "${TEST_SCAN_START_WHERE_LEFT:=}" : "${ENTRYPOINT_FEATURES:=}" +: "${TEST_SCAN_TASK_COMMANDS:=}" # Configuration file path if [[ -n "${ZEBRA_CONF_DIR}" ]] && [[ -n "${ZEBRA_CONF_FILE}" ]] && [[ -z "${ZEBRA_CONF_PATH}" ]]; then @@ -349,6 +350,11 @@ case "$1" in # Test that the scanner can continue scanning where it was left when zebrad restarts. check_directory_files "${ZEBRA_CACHED_STATE_DIR}" run_cargo_test "shielded-scan" "scan_start_where_left" + + elif [[ "${TEST_SCAN_TASK_COMMANDS}" -eq "1" ]]; then + # Test that the scanner can continue scanning where it was left when zebrad restarts. + check_directory_files "${ZEBRA_CACHED_STATE_DIR}" + run_cargo_test "shielded-scan" "scan_task_commands" else exec "$@" diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs index 5c5aeeae1..426ec94ba 100644 --- a/zebra-scan/src/service.rs +++ b/zebra-scan/src/service.rs @@ -118,10 +118,12 @@ impl Service for ScanService { return async move { // Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT` - let remove_keys_result = - tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?) - .await - .map_err(|_| "timeout waiting for delete keys done notification"); + let remove_keys_result = tokio::time::timeout( + DELETE_KEY_TIMEOUT, + scan_task.remove_keys(keys.clone())?, + ) + .await + .map_err(|_| "timeout waiting for delete keys done notification"); // Delete the key from the database after either confirmation that it's been removed from the scan task, or // waiting `DELETE_KEY_TIMEOUT`. diff --git a/zebra-scan/src/service/scan_task/commands.rs b/zebra-scan/src/service/scan_task/commands.rs index 5a210c1ec..b7a5fe0d6 100644 --- a/zebra-scan/src/service/scan_task/commands.rs +++ b/zebra-scan/src/service/scan_task/commands.rs @@ -183,14 +183,11 @@ impl ScanTask { /// Returns a oneshot channel receiver to notify the caller when the keys have been removed. pub fn remove_keys( &mut self, - keys: &[String], + keys: Vec, ) -> Result, TrySendError> { let (done_tx, done_rx) = oneshot::channel(); - self.send(ScanTaskCommand::RemoveKeys { - keys: keys.to_vec(), - done_tx, - })?; + self.send(ScanTaskCommand::RemoveKeys { keys, done_tx })?; Ok(done_rx) } diff --git a/zebra-scan/src/service/scan_task/tests/vectors.rs b/zebra-scan/src/service/scan_task/tests/vectors.rs index 9022da794..b3c1a0959 100644 --- a/zebra-scan/src/service/scan_task/tests/vectors.rs +++ b/zebra-scan/src/service/scan_task/tests/vectors.rs @@ -92,7 +92,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> { // Check that it removes keys correctly let sapling_keys = mock_sapling_scanning_keys(30, network); - let done_rx = mock_scan_task.remove_keys(&sapling_keys)?; + let done_rx = mock_scan_task.remove_keys(sapling_keys.clone())?; let (new_keys, _new_results_senders, _new_results_receivers) = ScanTask::process_messages(&mut cmd_receiver, &mut parsed_keys, network)?; @@ -111,7 +111,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> { mock_scan_task.register_keys(sapling_keys_with_birth_heights.clone())?; - mock_scan_task.remove_keys(&sapling_keys)?; + mock_scan_task.remove_keys(sapling_keys.clone())?; let (new_keys, _new_results_senders, _new_results_receivers) = ScanTask::process_messages(&mut cmd_receiver, &mut parsed_keys, network)?; @@ -125,7 +125,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> { mock_scan_task.register_keys(sapling_keys_with_birth_heights.clone())?; - mock_scan_task.remove_keys(&sapling_keys)?; + mock_scan_task.remove_keys(sapling_keys.clone())?; mock_scan_task.register_keys(sapling_keys_with_birth_heights[..2].to_vec())?; diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 0aad78c4e..c27bd0098 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -122,16 +122,10 @@ //! //! ## Shielded scanning tests //! -//! Example of how to run the scans_for_new_key test: +//! Example of how to run the scan_task_commands test: //! //! ```console -//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test scans_for_new_key --features shielded-scan --release -- --ignored --nocapture -//! ``` -//! -//! Example of how to run the scan_subscribe_results test: -//! -//! ```console -//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test scan_subscribe_results --features shielded-scan -- --ignored --nocapture +//! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test scan_task_commands --features shielded-scan --release -- --ignored --nocapture //! ``` //! //! ## Checkpoint Generation Tests @@ -3013,24 +3007,16 @@ fn scan_start_where_left() -> Result<()> { Ok(()) } -/// Test successful registration of a new key in the scan task. -/// -/// See [`common::shielded_scan::scans_for_new_key`] for more information. // TODO: Add this test to CI (#8236) +/// Tests successful: +/// - Registration of a new key, +/// - Subscription to scan results of new key, and +/// - Deletion of keys +/// in the scan task +/// See [`common::shielded_scan::scan_task_commands`] for more information. #[tokio::test] #[ignore] #[cfg(feature = "shielded-scan")] -async fn scans_for_new_key() -> Result<()> { - common::shielded_scan::scans_for_new_key::run().await -} - -/// Tests SubscribeResults ScanService request. -/// -/// See [`common::shielded_scan::subscribe_results`] for more information. -// TODO: Add this test to CI (#8236) -#[tokio::test] -#[ignore] -#[cfg(feature = "shielded-scan")] -async fn scan_subscribe_results() -> Result<()> { - common::shielded_scan::subscribe_results::run().await +async fn scan_task_commands() -> Result<()> { + common::shielded_scan::scan_task_commands::run().await } diff --git a/zebrad/tests/common/shielded_scan.rs b/zebrad/tests/common/shielded_scan.rs index e3478f180..106d6a16a 100644 --- a/zebrad/tests/common/shielded_scan.rs +++ b/zebrad/tests/common/shielded_scan.rs @@ -1,4 +1,3 @@ //! Acceptance tests for `shielded-scan`` feature in zebrad. -pub(crate) mod scans_for_new_key; -pub(crate) mod subscribe_results; +pub(crate) mod scan_task_commands; diff --git a/zebrad/tests/common/shielded_scan/subscribe_results.rs b/zebrad/tests/common/shielded_scan/scan_task_commands.rs similarity index 56% rename from zebrad/tests/common/shielded_scan/subscribe_results.rs rename to zebrad/tests/common/shielded_scan/scan_task_commands.rs index 9806619ad..e1d2fd630 100644 --- a/zebrad/tests/common/shielded_scan/subscribe_results.rs +++ b/zebrad/tests/common/shielded_scan/scan_task_commands.rs @@ -1,15 +1,16 @@ -//! Test registering and subscribing to the results for a new key in the scan task while zebrad is running. +//! Test registering keys, subscribing to their results, and deleting keys in the scan task while zebrad is running. //! //! This test requires a cached chain state that is partially synchronized past the //! Sapling activation height and [`REQUIRED_MIN_TIP_HEIGHT`] //! //! export ZEBRA_CACHED_STATE_DIR="/path/to/zebra/state" -//! cargo test scan_subscribe_results --features="shielded-scan" -- --ignored --nocapture +//! cargo test scan_task_commands --features="shielded-scan" -- --ignored --nocapture -use std::time::Duration; +use std::{fs, time::Duration}; use color_eyre::{eyre::eyre, Result}; +use tokio::sync::mpsc::error::TryRecvError; use tower::ServiceBuilder; use zebra_chain::{ block::Height, @@ -17,7 +18,11 @@ use zebra_chain::{ parameters::{Network, NetworkUpgrade}, }; -use zebra_scan::{service::ScanTask, storage::Storage, tests::ZECPAGES_SAPLING_VIEWING_KEY}; +use zebra_scan::{ + service::ScanTask, + storage::{db::SCANNER_DATABASE_KIND, Storage}, + tests::ZECPAGES_SAPLING_VIEWING_KEY, +}; use crate::common::{ cached_state::start_state_service_with_cache_dir, launch::can_spawn_zebrad_for_test_type, @@ -28,15 +33,24 @@ use crate::common::{ const REQUIRED_MIN_TIP_HEIGHT: Height = Height(1_000_000); /// How long this test waits for a result before failing. -const WAIT_FOR_RESULTS_DURATION: Duration = Duration::from_secs(30 * 60); +/// Should be long enough for ScanTask to start and scan ~500 blocks +const WAIT_FOR_RESULTS_DURATION: Duration = Duration::from_secs(60); -/// Initialize Zebra's state service with a cached state, add a new key to the scan task, and -/// check that it stores results for the new key without errors. +/// A block height where a scan result can be found with the [`ZECPAGES_SAPLING_VIEWING_KEY`] +const EXPECTED_RESULT_HEIGHT: Height = Height(780_532); + +/// Initialize Zebra's state service with a cached state, then: +/// - Start the scan task, +/// - Add a new key, +/// - Subscribe to results for that key, +/// - Check that the scanner sends an expected result, +/// - Remove the key and, +/// - Check that the results channel is disconnected pub(crate) async fn run() -> Result<()> { let _init_guard = zebra_test::init(); let test_type = TestType::UpdateZebraCachedStateNoRpc; - let test_name = "scan_subscribe_results"; + let test_name = "scan_task_commands"; let network = Network::Mainnet; // Skip the test unless the user specifically asked for it and there is a zebrad_state_path @@ -54,8 +68,19 @@ pub(crate) async fn run() -> Result<()> { .zebrad_state_path(test_name) .expect("already checked that there is a cached state path"); + let mut scan_config = zebra_scan::Config::default(); + scan_config.db_config_mut().cache_dir = zebrad_state_path.clone(); + + // Logs the network as zebrad would as part of the metadata when starting up. + // This is currently needed for the 'Check startup logs' step in CI to pass. + tracing::info!("Zcash network: {network}"); + + // Remove the scan directory before starting. + let scan_db_path = zebrad_state_path.join(SCANNER_DATABASE_KIND); + fs::remove_dir_all(std::path::Path::new(&scan_db_path)).ok(); + let (state_service, _read_state_service, latest_chain_tip, chain_tip_change) = - start_state_service_with_cache_dir(network, zebrad_state_path).await?; + start_state_service_with_cache_dir(network, zebrad_state_path.clone()).await?; let chain_tip_height = latest_chain_tip .best_tip_height() @@ -80,7 +105,7 @@ pub(crate) async fn run() -> Result<()> { let state = ServiceBuilder::new().buffer(10).service(state_service); // Create an ephemeral `Storage` instance - let storage = Storage::new(&zebra_scan::Config::ephemeral(), network, false); + let storage = Storage::new(&scan_config, network, false); let mut scan_task = ScanTask::spawn(storage, state, chain_tip_change); tracing::info!("started scan task, sending register/subscribe keys messages with zecpages key to start scanning for a new key",); @@ -89,12 +114,12 @@ pub(crate) async fn run() -> Result<()> { scan_task.register_keys( keys.iter() .cloned() - .map(|key| (key, Some(780_000))) + .map(|key| (key, Some(EXPECTED_RESULT_HEIGHT.0))) .collect(), )?; let mut result_receiver = scan_task - .subscribe(keys.into_iter().collect()) + .subscribe(keys.iter().cloned().collect()) .await .expect("should send and receive message successfully"); @@ -103,5 +128,34 @@ pub(crate) async fn run() -> Result<()> { tracing::info!(?result, "received a result from the channel"); + let result = result.expect("there should be some scan result"); + + assert_eq!( + EXPECTED_RESULT_HEIGHT, result.height, + "result height should match expected height for hard-coded key" + ); + + scan_task.remove_keys(keys.to_vec())?; + + // Wait for scan task to drop results sender + tokio::time::sleep(WAIT_FOR_RESULTS_DURATION).await; + + loop { + match result_receiver.try_recv() { + // Empty any messages in the buffer + Ok(_) => continue, + + Err(recv_error) => { + assert_eq!( + recv_error, + TryRecvError::Disconnected, + "any result senders should have been dropped" + ); + + break; + } + } + } + Ok(()) } diff --git a/zebrad/tests/common/shielded_scan/scans_for_new_key.rs b/zebrad/tests/common/shielded_scan/scans_for_new_key.rs deleted file mode 100644 index b7e5109c3..000000000 --- a/zebrad/tests/common/shielded_scan/scans_for_new_key.rs +++ /dev/null @@ -1,120 +0,0 @@ -//! Test registering and scanning for a new key in the scan task while zebrad is running. -//! -//! This test requires a cached chain state that is partially synchronized past the -//! Sapling activation height and [`REQUIRED_MIN_TIP_HEIGHT`] -//! -//! export ZEBRA_CACHED_STATE_DIR="/path/to/zebra/state" -//! cargo test scans_for_new_key --release --features="shielded-scan" -- --ignored --nocapture - -use std::time::Duration; - -use color_eyre::{eyre::eyre, Result}; - -use tower::ServiceBuilder; -use zebra_chain::{ - block::Height, - chain_tip::ChainTip, - parameters::{Network, NetworkUpgrade}, -}; -use zebra_scan::{service::ScanTask, storage::Storage, tests::ZECPAGES_SAPLING_VIEWING_KEY}; - -use crate::common::{ - cached_state::start_state_service_with_cache_dir, launch::can_spawn_zebrad_for_test_type, - test_type::TestType, -}; - -/// The minimum required tip height for the cached state in this test. -const REQUIRED_MIN_TIP_HEIGHT: Height = Height(1_000_000); - -/// How long this test waits after registering keys to check if there are any results. -const WAIT_FOR_RESULTS_DURATION: Duration = Duration::from_secs(60); - -/// Initialize Zebra's state service with a cached state, add a new key to the scan task, and -/// check that it stores results for the new key without errors. -pub(crate) async fn run() -> Result<()> { - let _init_guard = zebra_test::init(); - - let test_type = TestType::UpdateZebraCachedStateNoRpc; - let test_name = "scans_for_new_key"; - let network = Network::Mainnet; - - // Skip the test unless the user specifically asked for it and there is a zebrad_state_path - if !can_spawn_zebrad_for_test_type(test_name, test_type, true) { - return Ok(()); - } - - tracing::info!( - ?network, - ?test_type, - "running scans_for_new_key test using zebra state service", - ); - - let zebrad_state_path = test_type - .zebrad_state_path(test_name) - .expect("already checked that there is a cached state path"); - - let shielded_scan_config = zebra_scan::Config::default(); - - let (state_service, _read_state_service, latest_chain_tip, chain_tip_change) = - start_state_service_with_cache_dir(network, zebrad_state_path).await?; - - let chain_tip_height = latest_chain_tip - .best_tip_height() - .ok_or_else(|| eyre!("State directory doesn't have a chain tip block"))?; - - let sapling_activation_height = NetworkUpgrade::Sapling - .activation_height(network) - .expect("there should be an activation height for Mainnet"); - - assert!( - sapling_activation_height < REQUIRED_MIN_TIP_HEIGHT, - "minimum tip height should be above sapling activation height" - ); - - assert!( - REQUIRED_MIN_TIP_HEIGHT < chain_tip_height, - "chain tip height must be above required minimum tip height" - ); - - tracing::info!("opened state service with valid chain tip height, deleting any past keys in db and starting scan task",); - - // Before spawning `ScanTask`, delete past results for the zecpages key, if any. - let mut storage = Storage::new(&shielded_scan_config, network, false); - storage.delete_sapling_keys(vec![ZECPAGES_SAPLING_VIEWING_KEY.to_string()]); - - let state = ServiceBuilder::new().buffer(10).service(state_service); - - let mut scan_task = ScanTask::spawn(storage, state, chain_tip_change); - - tracing::info!("started scan task, sending register keys message with zecpages key to start scanning for a new key",); - - scan_task.register_keys( - [(ZECPAGES_SAPLING_VIEWING_KEY.to_string(), None)] - .into_iter() - .collect(), - )?; - - tracing::info!( - ?WAIT_FOR_RESULTS_DURATION, - "sent message, waiting for scan task to add some results", - ); - - // Wait for the scan task to add some results - tokio::time::sleep(WAIT_FOR_RESULTS_DURATION).await; - - // Check that there are some results in the database for the key - - let storage = Storage::new(&shielded_scan_config, network, true); - - let results = storage.sapling_results(&ZECPAGES_SAPLING_VIEWING_KEY.to_string()); - - tracing::info!(?results, "got the results"); - - // Check that some results were added for the zecpages key that was not in the config or the db when ScanTask started. - assert!( - !results.is_empty(), - "there should be results for the newly registered key" - ); - - Ok(()) -}