zebra/tower-batch/tests/worker.rs

122 lines
3.5 KiB
Rust
Raw Normal View History

//! Fixed test cases for batch worker tasks.
use std::time::Duration;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, task};
use tower::{Service, ServiceExt};
use tower_batch::{error, Batch};
use tower_test::mock;
#[tokio::test]
async fn wakes_pending_waiters_on_close() {
let _init_guard = zebra_test::init();
let (service, mut handle) = mock::pair::<_, ()>();
let (mut service, worker) = Batch::pair(service, 1, 1, Duration::from_secs(1));
let mut worker = task::spawn(worker.run());
// // keep the request in the worker
handle.allow(0);
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
let service1 = service.ready().await.unwrap();
let poll = worker.poll();
assert_pending!(poll);
let mut response = task::spawn(service1.call(()));
let mut service1 = service.clone();
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
let mut ready1 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready1.poll(), "no capacity");
let mut service1 = service.clone();
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
let mut ready2 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready2.poll(), "no capacity");
// kill the worker task
drop(worker);
let err = assert_ready_err!(response.poll());
assert!(
err.is::<error::Closed>(),
"response should fail with a Closed, got: {err:?}",
);
assert!(
ready1.is_woken(),
"dropping worker should wake ready task 1",
);
let err = assert_ready_err!(ready1.poll());
assert!(
err.is::<error::ServiceError>(),
"ready 1 should fail with a ServiceError {{ Closed }}, got: {err:?}",
);
assert!(
ready2.is_woken(),
"dropping worker should wake ready task 2",
);
let err = assert_ready_err!(ready1.poll());
assert!(
err.is::<error::ServiceError>(),
"ready 2 should fail with a ServiceError {{ Closed }}, got: {err:?}",
);
}
#[tokio::test]
async fn wakes_pending_waiters_on_failure() {
let _init_guard = zebra_test::init();
let (service, mut handle) = mock::pair::<_, ()>();
let (mut service, worker) = Batch::pair(service, 1, 1, Duration::from_secs(1));
let mut worker = task::spawn(worker.run());
// keep the request in the worker
handle.allow(0);
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
let service1 = service.ready().await.unwrap();
assert_pending!(worker.poll());
let mut response = task::spawn(service1.call("hello"));
let mut service1 = service.clone();
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
let mut ready1 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready1.poll(), "no capacity");
let mut service1 = service.clone();
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
let mut ready2 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready2.poll(), "no capacity");
// fail the inner service
handle.send_error("foobar");
// worker task terminates
assert_ready!(worker.poll());
let err = assert_ready_err!(response.poll());
assert!(
err.is::<error::ServiceError>(),
"response should fail with a ServiceError, got: {err:?}"
);
assert!(
ready1.is_woken(),
"dropping worker should wake ready task 1"
);
let err = assert_ready_err!(ready1.poll());
assert!(
err.is::<error::ServiceError>(),
"ready 1 should fail with a ServiceError, got: {err:?}"
);
assert!(
ready2.is_woken(),
"dropping worker should wake ready task 2"
);
let err = assert_ready_err!(ready1.poll());
assert!(
err.is::<error::ServiceError>(),
"ready 2 should fail with a ServiceError, got: {err:?}"
);
}