From 877c194b1b839ebfad72cc773fb27f07d1ec2a2e Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 4 Dec 2019 09:53:52 -0500 Subject: [PATCH] Update tower-limit and prepare for release (#375) * wip * Refactor limit tests and prep for release --- Cargo.toml | 2 +- tower-limit/Cargo.toml | 20 +-- tower-limit/src/lib.rs | 2 +- tower-limit/src/rate/service.rs | 12 +- tower-limit/tests/concurrency.rs | 216 ++++++++++++------------------- tower-limit/tests/rate.rs | 63 ++++----- tower-test/Cargo.toml | 4 + tower-test/src/macros.rs | 25 ++-- tower-test/src/mock/mod.rs | 44 +++---- tower-test/src/mock/spawn.rs | 72 +++++++++++ tower-test/tests/mock.rs | 45 +++---- 11 files changed, 242 insertions(+), 263 deletions(-) create mode 100644 tower-test/src/mock/spawn.rs diff --git a/Cargo.toml b/Cargo.toml index 3f3d775..465b736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ # "tower-filter", # "tower-hedge", "tower-layer", - # "tower-limit", + "tower-limit", # "tower-load", # "tower-load-shed", # "tower-ready-cache", diff --git a/tower-limit/Cargo.toml b/tower-limit/Cargo.toml index 0bf6bdd..9679752 100644 --- a/tower-limit/Cargo.toml +++ b/tower-limit/Cargo.toml @@ -8,13 +8,13 @@ name = "tower-limit" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.3.0-alpha.2" +version = "0.3.0" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" repository = "https://github.com/tower-rs/tower" homepage = "https://github.com/tower-rs/tower" -documentation = "https://docs.rs/tower-limit/0.3.0-alpha.2" +documentation = "https://docs.rs/tower-limit/0.3.0" description = """ Limit maximum request rate to a `Service`. """ @@ -22,14 +22,14 @@ categories = ["asynchronous", "network-programming"] edition = "2018" [dependencies] -futures-core-preview = "=0.3.0-alpha.19" -tower-service = { version = "=0.3.0-alpha.2", path = "../tower-service" } -tower-layer = { version = "=0.3.0-alpha.2", path = "../tower-layer" } -tokio-sync = "=0.2.0-alpha.6" -tokio-timer = "=0.3.0-alpha.6" +futures-core = "0.3" +tower-service = { version = "0.3", path = "../tower-service" } +tower-layer = { version = "0.3", path = "../tower-layer" } +tokio-sync = "0.2.0-alpha.6" +tokio = { version = "0.2", features = ["time"] } pin-project = "0.4" [dev-dependencies] -tower-test = { version = "=0.3.0-alpha.2", path = "../tower-test" } -tokio-test = "=0.2.0-alpha.6" -futures-util-preview = "=0.3.0-alpha.19" +tower-test = { version = "0.3", path = "../tower-test" } +tokio-test = "0.2" +tokio = { version = "0.2", features = ["macros", "test-util"] } diff --git a/tower-limit/src/lib.rs b/tower-limit/src/lib.rs index cbcabb3..8c5d466 100644 --- a/tower-limit/src/lib.rs +++ b/tower-limit/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0-alpha.2")] +#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0")] #![warn( missing_debug_implementations, missing_docs, diff --git a/tower-limit/src/rate/service.rs b/tower-limit/src/rate/service.rs index bab787e..ff82228 100644 --- a/tower-limit/src/rate/service.rs +++ b/tower-limit/src/rate/service.rs @@ -5,11 +5,9 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio_timer::{clock, Delay}; +use tokio::time::{Delay, Instant}; use tower_service::Service; -use std::time::Instant; - /// Enforces a rate limit on the number of requests the underlying /// service can handle over a period of time. #[derive(Debug)] @@ -30,7 +28,7 @@ impl RateLimit { /// Create a new rate limiter pub fn new(inner: T, rate: Rate) -> Self { let state = State::Ready { - until: clock::now(), + until: Instant::now(), rem: rate.num(), }; @@ -74,7 +72,7 @@ where } self.state = State::Ready { - until: clock::now() + self.rate.per(), + until: Instant::now() + self.rate.per(), rem: self.rate.num(), }; @@ -84,7 +82,7 @@ where fn call(&mut self, request: Request) -> Self::Future { match self.state { State::Ready { mut until, mut rem } => { - let now = clock::now(); + let now = Instant::now(); // If the period has elapsed, reset it. if now >= until { @@ -99,7 +97,7 @@ where self.state = State::Ready { until, rem }; } else { // The service is disabled until further notice - let sleep = tokio_timer::delay(until); + let sleep = tokio::time::delay_until(until); self.state = State::Limited(sleep); } diff --git a/tower-limit/tests/concurrency.rs b/tower-limit/tests/concurrency.rs index 79f8ade..8027d98 100644 --- a/tower-limit/tests/concurrency.rs +++ b/tower-limit/tests/concurrency.rs @@ -1,27 +1,21 @@ -use futures_util::{future::poll_fn, pin_mut}; -use tokio_test::{assert_pending, assert_ready, assert_ready_ok, block_on, task::MockTask}; -use tower_limit::concurrency::ConcurrencyLimit; -use tower_service::Service; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; +use tower_limit::concurrency::ConcurrencyLimitLayer; use tower_test::{assert_request_eq, mock}; -#[test] -fn basic_service_limit_functionality_with_poll_ready() { - let mut task = MockTask::new(); +#[tokio::test] +async fn basic_service_limit_functionality_with_poll_ready() { + let limit = ConcurrencyLimitLayer::new(2); + let (mut service, mut handle) = mock::spawn_layer(limit); - let (mut service, handle) = new_service(2); - pin_mut!(handle); - - block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap(); + assert_ready_ok!(service.poll_ready()); let r1 = service.call("hello 1"); - block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap(); + assert_ready_ok!(service.poll_ready()); let r2 = service.call("hello 2"); - task.enter(|cx| { - assert_pending!(service.poll_ready(cx)); - }); + assert_pending!(service.poll_ready()); - assert!(!task.is_woken()); + assert!(!service.is_woken()); // The request gets passed through assert_request_eq!(handle, "hello 1").send_response("world 1"); @@ -30,232 +24,184 @@ fn basic_service_limit_functionality_with_poll_ready() { assert_request_eq!(handle, "hello 2").send_response("world 2"); // There are no more requests - task.enter(|cx| { - assert_pending!(handle.as_mut().poll_request(cx)); - }); + assert_pending!(handle.poll_request()); - assert_eq!(block_on(r1).unwrap(), "world 1"); - assert!(task.is_woken()); + assert_eq!(r1.await.unwrap(), "world 1"); + + assert!(service.is_woken()); // Another request can be sent - task.enter(|cx| { - assert_ready_ok!(service.poll_ready(cx)); - }); + assert_ready_ok!(service.poll_ready()); let r3 = service.call("hello 3"); - task.enter(|cx| { - assert_pending!(service.poll_ready(cx)); - }); + assert_pending!(service.poll_ready()); - assert_eq!(block_on(r2).unwrap(), "world 2"); + assert_eq!(r2.await.unwrap(), "world 2"); // The request gets passed through assert_request_eq!(handle, "hello 3").send_response("world 3"); - assert_eq!(block_on(r3).unwrap(), "world 3"); + assert_eq!(r3.await.unwrap(), "world 3"); } -#[test] -fn basic_service_limit_functionality_without_poll_ready() { - let mut task = MockTask::new(); +#[tokio::test] +async fn basic_service_limit_functionality_without_poll_ready() { + let limit = ConcurrencyLimitLayer::new(2); + let (mut service, mut handle) = mock::spawn_layer(limit); - let (mut service, handle) = new_service(2); - pin_mut!(handle); - - assert_ready_ok!(task.enter(|cx| service.poll_ready(cx))); + assert_ready_ok!(service.poll_ready()); let r1 = service.call("hello 1"); - assert_ready_ok!(task.enter(|cx| service.poll_ready(cx))); + assert_ready_ok!(service.poll_ready()); let r2 = service.call("hello 2"); - assert_pending!(task.enter(|cx| service.poll_ready(cx))); + assert_pending!(service.poll_ready()); // The request gets passed through assert_request_eq!(handle, "hello 1").send_response("world 1"); - assert!(!task.is_woken()); + assert!(!service.is_woken()); // The next request gets passed through assert_request_eq!(handle, "hello 2").send_response("world 2"); - assert!(!task.is_woken()); + assert!(!service.is_woken()); // There are no more requests - assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx))); + assert_pending!(handle.poll_request()); - assert_eq!(block_on(r1).unwrap(), "world 1"); + assert_eq!(r1.await.unwrap(), "world 1"); - assert!(task.is_woken()); + assert!(service.is_woken()); // One more request can be sent - assert_ready_ok!(task.enter(|cx| service.poll_ready(cx))); + assert_ready_ok!(service.poll_ready()); let r4 = service.call("hello 4"); - assert_pending!(task.enter(|cx| service.poll_ready(cx))); + assert_pending!(service.poll_ready()); - assert_eq!(block_on(r2).unwrap(), "world 2"); - assert!(task.is_woken()); + assert_eq!(r2.await.unwrap(), "world 2"); + assert!(service.is_woken()); // The request gets passed through assert_request_eq!(handle, "hello 4").send_response("world 4"); - assert_eq!(block_on(r4).unwrap(), "world 4"); + assert_eq!(r4.await.unwrap(), "world 4"); } -#[test] -fn request_without_capacity() { - let mut task = MockTask::new(); +#[tokio::test] +async fn request_without_capacity() { + let limit = ConcurrencyLimitLayer::new(0); + let (mut service, _) = mock::spawn_layer::<(), (), _>(limit); - let (mut service, _) = new_service(0); - - task.enter(|cx| { - assert_pending!(service.poll_ready(cx)); - }); + assert_pending!(service.poll_ready()); } -#[test] -fn reserve_capacity_without_sending_request() { - let mut task = MockTask::new(); - - let (mut s1, handle) = new_service(1); - pin_mut!(handle); +#[tokio::test] +async fn reserve_capacity_without_sending_request() { + let limit = ConcurrencyLimitLayer::new(1); + let (mut s1, mut handle) = mock::spawn_layer(limit); let mut s2 = s1.clone(); // Reserve capacity in s1 - task.enter(|cx| { - assert_ready_ok!(s1.poll_ready(cx)); - }); + assert_ready_ok!(s1.poll_ready()); // Service 2 cannot get capacity - task.enter(|cx| { - assert_pending!(s2.poll_ready(cx)); - }); + assert_pending!(s2.poll_ready()); // s1 sends the request, then s2 is able to get capacity let r1 = s1.call("hello"); assert_request_eq!(handle, "hello").send_response("world"); - task.enter(|cx| { - assert_pending!(s2.poll_ready(cx)); - }); + assert_pending!(s2.poll_ready()); - block_on(r1).unwrap(); + r1.await.unwrap(); - task.enter(|cx| { - assert_ready_ok!(s2.poll_ready(cx)); - }); + assert_ready_ok!(s2.poll_ready()); } -#[test] -fn service_drop_frees_capacity() { - let mut task = MockTask::new(); - - let (mut s1, _handle) = new_service(1); +#[tokio::test] +async fn service_drop_frees_capacity() { + let limit = ConcurrencyLimitLayer::new(1); + let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit); let mut s2 = s1.clone(); // Reserve capacity in s1 - assert_ready_ok!(task.enter(|cx| s1.poll_ready(cx))); + assert_ready_ok!(s1.poll_ready()); // Service 2 cannot get capacity - task.enter(|cx| { - assert_pending!(s2.poll_ready(cx)); - }); + assert_pending!(s2.poll_ready()); drop(s1); - assert!(task.is_woken()); - assert_ready_ok!(task.enter(|cx| s2.poll_ready(cx))); + assert!(s2.is_woken()); + assert_ready_ok!(s2.poll_ready()); } -#[test] -fn response_error_releases_capacity() { - let mut task = MockTask::new(); - - let (mut s1, handle) = new_service(1); - pin_mut!(handle); +#[tokio::test] +async fn response_error_releases_capacity() { + let limit = ConcurrencyLimitLayer::new(1); + let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit); let mut s2 = s1.clone(); // Reserve capacity in s1 - task.enter(|cx| { - assert_ready_ok!(s1.poll_ready(cx)); - }); + assert_ready_ok!(s1.poll_ready()); // s1 sends the request, then s2 is able to get capacity let r1 = s1.call("hello"); assert_request_eq!(handle, "hello").send_error("boom"); - block_on(r1).unwrap_err(); + r1.await.unwrap_err(); - task.enter(|cx| { - assert_ready_ok!(s2.poll_ready(cx)); - }); + assert_ready_ok!(s2.poll_ready()); } -#[test] -fn response_future_drop_releases_capacity() { - let mut task = MockTask::new(); - - let (mut s1, _handle) = new_service(1); +#[tokio::test] +async fn response_future_drop_releases_capacity() { + let limit = ConcurrencyLimitLayer::new(1); + let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit); let mut s2 = s1.clone(); // Reserve capacity in s1 - task.enter(|cx| { - assert_ready_ok!(s1.poll_ready(cx)); - }); + assert_ready_ok!(s1.poll_ready()); // s1 sends the request, then s2 is able to get capacity let r1 = s1.call("hello"); - task.enter(|cx| { - assert_pending!(s2.poll_ready(cx)); - }); + assert_pending!(s2.poll_ready()); drop(r1); - task.enter(|cx| { - assert_ready_ok!(s2.poll_ready(cx)); - }); + assert_ready_ok!(s2.poll_ready()); } -#[test] -fn multi_waiters() { - let mut task1 = MockTask::new(); - let mut task2 = MockTask::new(); - let mut task3 = MockTask::new(); - - let (mut s1, _handle) = new_service(1); +#[tokio::test] +async fn multi_waiters() { + let limit = ConcurrencyLimitLayer::new(1); + let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit); let mut s2 = s1.clone(); let mut s3 = s1.clone(); // Reserve capacity in s1 - task1.enter(|cx| assert_ready_ok!(s1.poll_ready(cx))); + assert_ready_ok!(s1.poll_ready()); // s2 and s3 are not ready - task2.enter(|cx| assert_pending!(s2.poll_ready(cx))); - task3.enter(|cx| assert_pending!(s3.poll_ready(cx))); + assert_pending!(s2.poll_ready()); + assert_pending!(s3.poll_ready()); drop(s1); - assert!(task2.is_woken()); - assert!(!task3.is_woken()); + assert!(s2.is_woken()); + assert!(!s3.is_woken()); drop(s2); - assert!(task3.is_woken()); -} - -type Mock = mock::Mock<&'static str, &'static str>; -type Handle = mock::Handle<&'static str, &'static str>; - -fn new_service(max: usize) -> (ConcurrencyLimit, Handle) { - let (service, handle) = mock::pair(); - let service = ConcurrencyLimit::new(service, max); - (service, handle) + assert!(s3.is_woken()); } diff --git a/tower-limit/tests/rate.rs b/tower-limit/tests/rate.rs index bb824c5..49b9fc6 100644 --- a/tower-limit/tests/rate.rs +++ b/tower-limit/tests/rate.rs @@ -1,52 +1,35 @@ -use futures_util::pin_mut; -use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task::MockTask}; -use tower_limit::rate::*; -use tower_service::*; +use std::time::Duration; +use tokio::time; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; +use tower_limit::rate::RateLimitLayer; use tower_test::{assert_request_eq, mock}; -use std::future::Future; -use std::time::Duration; +#[tokio::test] +async fn reaching_capacity() { + time::pause(); -#[test] -fn reaching_capacity() { - clock::mock(|time| { - let mut task = MockTask::new(); + let rate_limit = RateLimitLayer::new(1, Duration::from_millis(100)); - let (mut service, handle) = new_service(Rate::new(1, from_millis(100))); - pin_mut!(handle); + let (mut service, mut handle) = mock::spawn_layer(rate_limit); - assert_ready_ok!(task.enter(|cx| service.poll_ready(cx))); - let response = service.call("hello"); - pin_mut!(response); + assert_ready_ok!(service.poll_ready()); - assert_request_eq!(handle, "hello").send_response("world"); - assert_ready_ok!(task.enter(|cx| response.poll(cx)), "world"); + let response = service.call("hello"); - assert_pending!(task.enter(|cx| service.poll_ready(cx))); - assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx))); + assert_request_eq!(handle, "hello").send_response("world"); - time.advance(Duration::from_millis(100)); + assert_eq!(response.await.unwrap(), "world"); + assert_pending!(service.poll_ready()); - assert_ready_ok!(task.enter(|cx| service.poll_ready(cx))); + assert_pending!(handle.poll_request()); - // Send a second request - let response = service.call("two"); - pin_mut!(response); + time::advance(Duration::from_millis(101)).await; - assert_request_eq!(handle, "two").send_response("done"); - assert_ready_ok!(task.enter(|cx| response.poll(cx)), "done"); - }); -} - -type Mock = mock::Mock<&'static str, &'static str>; -type Handle = mock::Handle<&'static str, &'static str>; - -fn new_service(rate: Rate) -> (RateLimit, Handle) { - let (service, handle) = mock::pair(); - let service = RateLimit::new(service, rate); - (service, handle) -} - -fn from_millis(n: u64) -> Duration { - Duration::from_millis(n) + assert_ready_ok!(service.poll_ready()); + + let response = service.call("two"); + + assert_request_eq!(handle, "two").send_response("done"); + + assert_eq!(response.await.unwrap(), "done"); } diff --git a/tower-test/Cargo.toml b/tower-test/Cargo.toml index 7ce36ea..e924e6a 100644 --- a/tower-test/Cargo.toml +++ b/tower-test/Cargo.toml @@ -24,6 +24,10 @@ edition = "2018" [dependencies] futures-util = "0.3" tokio = { version = "0.2", features = ["sync"]} +tower-layer = { version = "0.3", path = "../tower-layer" } tokio-test = "0.2" tower-service = { version = "0.3", path = "../tower-service" } pin-project = "0.4" + +[dev-dependencies] +tokio = { version = "0.2", features = ["macros"] } \ No newline at end of file diff --git a/tower-test/src/macros.rs b/tower-test/src/macros.rs index aaa4ef7..68b1f44 100644 --- a/tower-test/src/macros.rs +++ b/tower-test/src/macros.rs @@ -7,25 +7,20 @@ /// # Examples /// /// ```rust -/// #[macro_use] -/// extern crate tower_test; -/// extern crate tower_service; -/// /// use tower_service::Service; -/// use tower_test::mock; -/// use std::task::{Poll, Context}; +/// use tower_test::{mock, assert_request_eq}; /// use tokio_test::assert_ready; -/// use futures_util::pin_mut; /// -/// # fn main() { -/// mock::task_fn(|cx, mock, handle|{ -/// assert_ready!(mock.poll_ready(cx)); +/// # async fn test() { +/// let (mut service, mut handle) = mock::spawn(); /// -/// let _response = mock.call("hello"); +/// assert_ready!(service.poll_ready()); /// -/// assert_request_eq!(handle, "hello") -/// .send_response("world"); -/// }); +/// let response = service.call("hello"); +/// +/// assert_request_eq!(handle, "hello").send_response("world"); +/// +/// assert_eq!(response.await.unwrap(), "world"); /// # } /// ``` #[macro_export] @@ -34,7 +29,7 @@ macro_rules! assert_request_eq { assert_request_eq!($mock_handle, $expect,) }; ($mock_handle:expr, $expect:expr, $($arg:tt)*) => {{ - let (actual, send_response) = match $mock_handle.as_mut().next_request() { + let (actual, send_response) = match $mock_handle.next_request().await { Some(r) => r, None => panic!("expected a request but none was received."), }; diff --git a/tower-test/src/mock/mod.rs b/tower-test/src/mock/mod.rs index 7576d3c..5f830a0 100644 --- a/tower-test/src/mock/mod.rs +++ b/tower-test/src/mock/mod.rs @@ -2,35 +2,39 @@ pub mod error; pub mod future; +pub mod spawn; -use crate::mock::{error::Error, future::ResponseFuture}; +use crate::mock::{error::Error, future::ResponseFuture, spawn::Spawn}; use core::task::Waker; use tokio::sync::{mpsc, oneshot}; +use tower_layer::Layer; use tower_service::Service; use std::{ collections::HashMap, future::Future, - pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, u64, }; -/// Mock a task and Service. -pub fn task_fn(mut f: F) +/// Spawn a layer onto a mock service. +pub fn spawn_layer(layer: L) -> (Spawn, Handle) where - F: FnMut(&mut Context, &mut Pin<&mut Mock>, &mut Pin<&mut Handle>), + L: Layer>, { - tokio_test::task::spawn(()).enter(|cx, _| { - let (mock, handle) = pair(); + let (inner, handle) = pair(); + let svc = layer.layer(inner); - futures_util::pin_mut!(mock); - futures_util::pin_mut!(handle); + (Spawn::new(svc), handle) +} - f(cx, &mut mock, &mut handle) - }) +/// Spawn a Service onto a mock task. +pub fn spawn() -> (Spawn>, Handle) { + let (svc, handle) = pair(); + + (Spawn::new(svc), handle) } /// A mock service @@ -212,21 +216,13 @@ impl Drop for Mock { impl Handle { /// Asynchronously gets the next request - pub fn poll_request( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - Box::pin(self.rx.recv()).as_mut().poll(cx) + pub fn poll_request(&mut self) -> Poll>> { + tokio_test::task::spawn(()).enter(|cx, _| Box::pin(self.rx.recv()).as_mut().poll(cx)) } - /// Synchronously gets the next request. - /// - /// This function blocks the current thread until a request is received. - pub fn next_request(mut self: Pin<&mut Self>) -> Option> { - use futures_util::future::poll_fn; - use tokio_test::block_on; - - block_on(poll_fn(|cx| self.as_mut().poll_request(cx))) + /// Gets the next request. + pub async fn next_request(&mut self) -> Option> { + self.rx.recv().await } /// Allow a certain number of requests diff --git a/tower-test/src/mock/spawn.rs b/tower-test/src/mock/spawn.rs new file mode 100644 index 0000000..32d2313 --- /dev/null +++ b/tower-test/src/mock/spawn.rs @@ -0,0 +1,72 @@ +//! Spawn mock services onto a mock task. + +use std::task::Poll; +use tokio_test::task; +use tower_service::Service; + +/// Service spawned on a mock task +#[derive(Debug)] +pub struct Spawn { + inner: T, + task: task::Spawn<()>, +} + +impl Spawn { + /// Create a new spawn. + pub fn new(inner: T) -> Self { + Self { + inner, + task: task::spawn(()), + } + } + + /// Check if this service has been woken up. + pub fn is_woken(&self) -> bool { + self.task.is_woken() + } + + /// Get how many futurs are holding onto the waker. + pub fn waker_ref_count(&self) -> usize { + self.task.waker_ref_count() + } + + /// Poll this service ready. + pub fn poll_ready(&mut self) -> Poll> + where + T: Service, + { + let task = &mut self.task; + let inner = &mut self.inner; + + task.enter(|cx, _| inner.poll_ready(cx)) + } + + /// Call the inner Service. + pub fn call(&mut self, req: Request) -> T::Future + where + T: Service, + { + self.inner.call(req) + } + + /// Get the inner service. + pub fn into_inner(self) -> T { + self.inner + } + + /// Get a reference to the inner service. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Get a mutable reference to the inner service. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl Clone for Spawn { + fn clone(&self) -> Self { + Spawn::new(self.inner.clone()) + } +} diff --git a/tower-test/tests/mock.rs b/tower-test/tests/mock.rs index 62b7868..1dd43b5 100644 --- a/tower-test/tests/mock.rs +++ b/tower-test/tests/mock.rs @@ -1,44 +1,29 @@ -use futures_util::pin_mut; -use std::future::Future; use tokio_test::{assert_pending, assert_ready}; -use tower_service::Service; use tower_test::{assert_request_eq, mock}; -#[test] -fn single_request_ready() { - mock::task_fn::(|cx, mock, handle| { - // No pending requests - assert!(handle.as_mut().poll_request(cx).is_pending()); +#[tokio::test] +async fn single_request_ready() { + let (mut service, mut handle) = mock::spawn(); - // Issue a request - assert_ready!(mock.poll_ready(cx)).unwrap(); + assert_pending!(handle.poll_request()); - let response = mock.call("hello?".into()); - pin_mut!(response); + assert_ready!(service.poll_ready()).unwrap(); - // Get the request from the handle - let send_response = assert_request_eq!(handle, "hello?"); + let response = service.call("hello"); - // Response is not ready - assert_pending!(response.as_mut().poll(cx)); + assert_request_eq!(handle, "hello").send_response("world"); - // Send the response - send_response.send_response("yes?".into()); - - assert_eq!(tokio_test::block_on(response).unwrap().as_str(), "yes?"); - }); + assert_eq!(response.await.unwrap(), "world"); } -#[test] +#[tokio::test] #[should_panic] -fn backpressure() { - mock::task_fn::(|cx, mock, handle| { - handle.allow(0); +async fn backpressure() { + let (mut service, mut handle) = mock::spawn::<_, ()>(); - // Make sure the mock cannot accept more requests - assert_pending!(mock.poll_ready(cx)); + handle.allow(0); - // Try to send a request - mock.call("hello?".into()); - }); + assert_pending!(service.poll_ready()); + + service.call("hello").await.unwrap(); }