diff --git a/Cargo.toml b/Cargo.toml index b00d828..874eb51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ members = [ "tower-buffer", "tower-discover", "tower-filter", - # "tower-hedge", + "tower-hedge", "tower-layer", "tower-limit", "tower-load", diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index ba82037..5ca87b8 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -40,7 +40,6 @@ tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" } tower-load = { version = "=0.3.0-alpha.1", path = "../tower-load" } tower-service = "=0.3.0-alpha.1" tower-make = { version = "=0.3.0-alpha.1", path = "../tower-make" } -tower-util = { version = "=0.3.0-alpha.1", path = "../tower-util" } slab = "0.4" [dev-dependencies] diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index ab84d11..2b9b3f1 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -4,6 +4,7 @@ use futures_util::{stream, try_future, try_future::TryFutureExt}; use indexmap::IndexMap; use pin_project::pin_project; use rand::{rngs::SmallRng, FromEntropy}; +use std::marker::PhantomData; use std::{ future::Future, pin::Pin, @@ -13,7 +14,6 @@ use tokio_sync::oneshot; use tower_discover::{Change, Discover}; use tower_load::Load; use tower_service::Service; -use tower_util::Ready; use tracing::{debug, trace}; /// Distributes requests across inner services using the [Power of Two Choices][p2c]. @@ -50,6 +50,8 @@ pub struct Balance { next_ready_index: Option, rng: SmallRng, + + _req: PhantomData, } #[pin_project] @@ -61,8 +63,9 @@ struct UnreadyService { key: Option, #[pin] cancel: oneshot::Receiver<()>, - #[pin] - ready: tower_util::Ready, + service: Option, + + _req: PhantomData, } enum Error { @@ -84,6 +87,8 @@ where cancelations: IndexMap::default(), unready_services: stream::FuturesUnordered::new(), next_ready_index: None, + + _req: PhantomData, } } @@ -139,8 +144,9 @@ where self.cancelations.insert(key.clone(), tx); self.unready_services.push(UnreadyService { key: Some(key), - ready: Ready::new(svc), + service: Some(svc), cancel: rx, + _req: PhantomData, }); } @@ -342,15 +348,18 @@ impl, Req> Future for UnreadyService { return Poll::Ready(Err((key, Error::Canceled))); } - match ready!(this.ready.poll(cx)) { - Ok(svc) => { - let key = this.key.take().expect("polled after ready"); - Poll::Ready(Ok((key, svc))) - } - Err(e) => { - let key = this.key.take().expect("polled after ready"); - Poll::Ready(Err((key, Error::Inner(e)))) - } + let res = ready!(this + .service + .as_mut() + .expect("poll after ready") + .poll_ready(cx)); + + let key = this.key.take().expect("polled after ready"); + let svc = this.service.take().expect("polled after ready"); + + match res { + Ok(()) => Poll::Ready(Ok((key, svc))), + Err(e) => Poll::Ready(Err((key, Error::Inner(e)))), } } } diff --git a/tower-hedge/Cargo.toml b/tower-hedge/Cargo.toml index 9fcf134..64caa39 100644 --- a/tower-hedge/Cargo.toml +++ b/tower-hedge/Cargo.toml @@ -1,18 +1,20 @@ [package] name = "tower-hedge" -version = "0.1.0" +version = "0.3.0-alpha.1" authors = ["Alex Leong "] +edition = "2018" publish = false [dependencies] -futures = "0.1" hdrhistogram = "6.0" log = "0.4.1" -tower-service = "0.2.0" -tower-filter = { version = "0.1", path = "../tower-filter" } -tokio-mock-task = { git = "https://github.com/carllerche/tokio-mock-task" } -tokio-timer = "0.2.6" +tower-service = "0.3.0-alpha.1" +tower-filter = { version = "0.3.0-alpha.1", path = "../tower-filter" } +tokio-timer = "0.3.0-alpha.4" +futures-util-preview = "0.3.0-alpha.18" +pin-project = "0.4.0-alpha.10" [dev-dependencies] -tower-test = { version = "0.1", path = "../tower-test" } -tokio-executor = "0.1.2" +tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" } +tokio-test = "0.2.0-alpha.4" +tokio-executor = "0.2.0-alpha.4" diff --git a/tower-hedge/src/delay.rs b/tower-hedge/src/delay.rs index 21ff048..e94691f 100644 --- a/tower-hedge/src/delay.rs +++ b/tower-hedge/src/delay.rs @@ -1,7 +1,12 @@ -use futures::{Async, Future, Poll}; -use tower_service::Service; - +use futures_util::ready; +use pin_project::{pin_project, project}; use std::time::Duration; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; /// A policy which specifies how long each request should be delayed for. pub trait Policy { @@ -16,16 +21,19 @@ pub struct Delay { service: S, } +#[pin_project] #[derive(Debug)] pub struct ResponseFuture { service: S, + #[pin] state: State, } +#[pin_project] #[derive(Debug)] enum State { - Delaying(tokio_timer::Delay, Option), - Called(F), + Delaying(#[pin] tokio_timer::Delay, Option), + Called(#[pin] F), } impl Delay { @@ -49,8 +57,8 @@ where type Error = super::Error; type Future = ResponseFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(|e| e.into()) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx).map_err(|e| e.into()) } fn call(&mut self, request: Request) -> Self::Future { @@ -63,37 +71,36 @@ where }; ResponseFuture { service: orig, - state: State::Delaying(tokio_timer::Delay::new(deadline), Some(request)), + state: State::Delaying(tokio_timer::delay(deadline), Some(request)), } } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - F: Future, - F::Error: Into, - S: Service, + F: Future>, + E: Into, + S: Service, { - type Item = F::Item; - type Error = super::Error; + type Output = Result; + + #[project] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - fn poll(&mut self) -> Poll { loop { - let next = match self.state { - State::Delaying(ref mut delay, ref mut req) => match delay.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(())) => { - let req = req.take().expect("Missing request in delay"); - let fut = self.service.call(req); - State::Called(fut) - } - Err(e) => return Err(e.into()), - }, - State::Called(ref mut fut) => { - return fut.poll().map_err(|e| e.into()); + #[project] + match this.state.project() { + State::Delaying(delay, req) => { + ready!(delay.poll(cx)); + let req = req.take().expect("Missing request in delay"); + let fut = this.service.call(req); + this.state.set(State::Called(fut)); + } + State::Called(fut) => { + return fut.poll(cx).map_err(Into::into); } }; - self.state = next; } } } diff --git a/tower-hedge/src/latency.rs b/tower-hedge/src/latency.rs index 5633857..20065b6 100644 --- a/tower-hedge/src/latency.rs +++ b/tower-hedge/src/latency.rs @@ -1,9 +1,14 @@ -use futures::{Async, Future, Poll}; +use futures_util::ready; +use pin_project::pin_project; +use std::time::{Duration, Instant}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tokio_timer::clock; use tower_service::Service; -use std::time::{Duration, Instant}; - /// Record is the interface for accepting request latency measurements. When /// a request completes, record is called with the elapsed duration between /// when the service was called and when the future completed. @@ -19,10 +24,12 @@ pub struct Latency { service: S, } +#[pin_project] #[derive(Debug)] pub struct ResponseFuture { start: Instant, rec: R, + #[pin] inner: F, } @@ -42,15 +49,15 @@ where impl Service for Latency where S: Service, - S::Error: Into, + super::Error: From, R: Record + Clone, { type Response = S::Response; type Error = super::Error; type Future = ResponseFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(|e| e.into()) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx).map_err(|e| e.into()) } fn call(&mut self, request: Request) -> Self::Future { @@ -62,24 +69,20 @@ where } } -impl Future for ResponseFuture +impl Future for ResponseFuture where R: Record, - F: Future, - F::Error: Into, + F: Future>, + super::Error: From, { - type Item = F::Item; - type Error = super::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.inner.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(rsp)) => { - let duration = clock::now() - self.start; - self.rec.record(duration); - Ok(Async::Ready(rsp)) - } - Err(e) => Err(e.into()), - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let rsp = ready!(this.inner.poll(cx))?; + let duration = clock::now() - *this.start; + this.rec.record(duration); + Poll::Ready(Ok(rsp)) } } diff --git a/tower-hedge/src/lib.rs b/tower-hedge/src/lib.rs index dc4b761..c95638b 100644 --- a/tower-hedge/src/lib.rs +++ b/tower-hedge/src/lib.rs @@ -3,18 +3,16 @@ #![deny(warnings)] #![deny(missing_docs)] -extern crate futures; -extern crate hdrhistogram; -#[macro_use] -extern crate log; -extern crate tokio_timer; -extern crate tower_filter; -extern crate tower_service; -use futures::future::FutureResult; -use futures::{future, Poll}; +use futures_util::future; +use log::error; +use pin_project::pin_project; use std::sync::{Arc, Mutex}; use std::time::Duration; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; use tower_filter::Filter; mod delay; @@ -38,12 +36,16 @@ type Service = select::Select< /// future or the retry future completes, that value is used. #[derive(Debug)] pub struct Hedge(Service); + +#[pin_project] /// The Future returned by the hedge Service. -pub struct Future( as tower_service::Service>::Future) +pub struct Future where - S: tower_service::Service + Clone, - S::Error: Into, - P: Policy + Clone; + S: tower_service::Service, +{ + #[pin] + inner: S::Future, +} type Error = Box; @@ -58,15 +60,20 @@ pub trait Policy { fn can_retry(&self, req: &Request) -> bool; } +// NOTE: these are pub only because they appear inside a Future + +#[doc(hidden)] #[derive(Clone, Debug)] -struct PolicyPredicate

(P); +pub struct PolicyPredicate

(P); +#[doc(hidden)] #[derive(Debug)] -struct DelayPolicy { +pub struct DelayPolicy { histo: Histo, latency_percentile: f32, } +#[doc(hidden)] #[derive(Debug)] -struct SelectPolicy

{ +pub struct SelectPolicy

{ policy: P, histo: Histo, min_data_points: u64, @@ -83,7 +90,7 @@ impl Hedge { ) -> Hedge where S: tower_service::Service + Clone, - S::Error: Into, + Error: From, P: Policy + Clone, { let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); @@ -102,7 +109,7 @@ impl Hedge { ) -> Hedge where S: tower_service::Service + Clone, - S::Error: Into, + Error: From, P: Policy + Clone, { let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); @@ -124,7 +131,7 @@ impl Hedge { ) -> Hedge where S: tower_service::Service + Clone, - S::Error: Into, + Error: From, P: Policy + Clone, { // Clone the underlying service and wrap both copies in a middleware that @@ -157,33 +164,33 @@ impl Hedge { impl tower_service::Service for Hedge where S: tower_service::Service + Clone, - S::Error: Into, + Error: From, P: Policy + Clone, { type Response = S::Response; type Error = Error; - type Future = Future; + type Future = Future, Request>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) } fn call(&mut self, request: Request) -> Self::Future { - Future(self.0.call(request)) + Future { + inner: self.0.call(request), + } } } -impl futures::Future for Future +impl std::future::Future for Future where - S: tower_service::Service + Clone, - S::Error: Into, - P: Policy + Clone, + S: tower_service::Service, + Error: From, { - type Item = S::Response; - type Error = Error; + type Output = Result; - fn poll(&mut self) -> Poll { - self.0.poll() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx).map_err(Into::into) } } @@ -213,19 +220,19 @@ where P: Policy, { type Future = future::Either< - FutureResult<(), tower_filter::error::Error>, - future::Empty<(), tower_filter::error::Error>, + future::Ready>, + future::Pending>, >; fn check(&mut self, request: &Request) -> Self::Future { if self.0.can_retry(request) { - future::Either::A(future::ok(())) + future::Either::Left(future::ready(Ok(()))) } else { // If the hedge retry should not be issued, we simply want to wait // for the result of the original request. Therefore we don't want - // to return an error here. Instead, we use future::empty to ensure + // to return an error here. Instead, we use future::pending to ensure // that the original request wins the select. - future::Either::B(future::empty()) + future::Either::Right(future::pending()) } } } diff --git a/tower-hedge/src/rotating_histogram.rs b/tower-hedge/src/rotating_histogram.rs index 7caea77..0fbb965 100644 --- a/tower-hedge/src/rotating_histogram.rs +++ b/tower-hedge/src/rotating_histogram.rs @@ -1,8 +1,8 @@ extern crate tokio_timer; use hdrhistogram::Histogram; +use log::trace; use std::time::{Duration, Instant}; - use tokio_timer::clock; /// This represents a "rotating" histogram which stores two histogram, one which diff --git a/tower-hedge/src/select.rs b/tower-hedge/src/select.rs index a78a6a1..54d6f09 100644 --- a/tower-hedge/src/select.rs +++ b/tower-hedge/src/select.rs @@ -1,4 +1,9 @@ -use futures::{Async, Future, Poll}; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// A policy which decides which requests can be cloned and sent to the B @@ -18,9 +23,12 @@ pub struct Select { b: B, } +#[pin_project] #[derive(Debug)] pub struct ResponseFuture { + #[pin] a_fut: AF, + #[pin] b_fut: Option, } @@ -41,21 +49,20 @@ impl Service for Select where P: Policy, A: Service, - A::Error: Into, + super::Error: From, B: Service, - B::Error: Into, + super::Error: From, { type Response = A::Response; type Error = super::Error; type Future = ResponseFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let a = self.a.poll_ready().map_err(|e| e.into())?; - let b = self.b.poll_ready().map_err(|e| e.into())?; - if a.is_ready() && b.is_ready() { - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match (self.a.poll_ready(cx), self.b.poll_ready(cx)) { + (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), + (Poll::Ready(Err(e)), _) => Poll::Ready(Err(e.into())), + (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e.into())), + _ => Poll::Pending, } } @@ -72,29 +79,26 @@ where } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - AF: Future, - AF::Error: Into, - BF: Future, - BF::Error: Into, + AF: Future>, + super::Error: From, + BF: Future>, + super::Error: From, { - type Item = AF::Item; - type Error = super::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.a_fut.poll() { - Ok(Async::NotReady) => {} - Ok(Async::Ready(a)) => return Ok(Async::Ready(a)), - Err(e) => return Err(e.into()), + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Poll::Ready(r) = this.a_fut.poll(cx) { + return Poll::Ready(Ok(r?)); } - if let Some(ref mut b_fut) = self.b_fut { - match b_fut.poll() { - Ok(Async::NotReady) => {} - Ok(Async::Ready(b)) => return Ok(Async::Ready(b)), - Err(e) => return Err(e.into()), + if let Some(b_fut) = this.b_fut.as_pin_mut() { + if let Poll::Ready(r) = b_fut.poll(cx) { + return Poll::Ready(Ok(r?)); } } - return Ok(Async::NotReady); + return Poll::Pending; } } diff --git a/tower-hedge/tests/hedge.rs b/tower-hedge/tests/hedge.rs index a86c76c..99e29ab 100644 --- a/tower-hedge/tests/hedge.rs +++ b/tower-hedge/tests/hedge.rs @@ -1,142 +1,153 @@ -extern crate futures; -extern crate tokio_executor; -extern crate tokio_mock_task; -extern crate tokio_timer; -extern crate tower_hedge as hedge; -extern crate tower_service; -extern crate tower_test; - -#[macro_use] -mod support; -use support::*; - -use futures::Future; -use hedge::{Hedge, Policy}; -use tower_service::Service; - +use futures_util::pin_mut; +use std::future::Future; use std::time::Duration; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task}; +use tower_hedge::{Hedge, Policy}; +use tower_service::Service; +use tower_test::assert_request_eq; #[test] fn hedge_orig_completes_first() { - let (mut service, mut handle) = new_service(TestPolicy); + task::mock(|cx| { + clock::mock(|time| { + let (mut service, handle) = new_service(TestPolicy); + pin_mut!(handle); - mocked(|timer, _| { - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call("orig"); - // Check that orig request has been issued. - let (_, req) = handle.next_request().expect("orig"); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call("orig"); + pin_mut!(fut); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); - advance(timer, ms(10)); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); - // Check that the hedge has been issued. - let (_, _hedge_req) = handle.next_request().expect("hedge"); + // Check that orig request has been issued. + let req = assert_request_eq!(handle, "orig"); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); - req.send_response("orig-done"); - // Check that fut gets orig response. - assert_eq!(fut.wait().unwrap(), "orig-done"); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); + time.advance(Duration::from_millis(10)); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); + // Check that the hedge has been issued. + let _hedge_req = assert_request_eq!(handle, "orig"); + + req.send_response("orig-done"); + // Check that fut gets orig response. + assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done"); + }); }); } #[test] fn hedge_hedge_completes_first() { - let (mut service, mut handle) = new_service(TestPolicy); + task::mock(|cx| { + clock::mock(|time| { + let (mut service, handle) = new_service(TestPolicy); + pin_mut!(handle); - mocked(|timer, _| { - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call("orig"); - // Check that orig request has been issued. - let (_, _req) = handle.next_request().expect("orig"); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call("orig"); + pin_mut!(fut); + // Check that orig request has been issued. + let _req = assert_request_eq!(handle, "orig"); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); - advance(timer, ms(10)); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); + time.advance(Duration::from_millis(10)); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); - // Check that the hedge has been issued. - let (_, hedge_req) = handle.next_request().expect("hedge"); - hedge_req.send_response("hedge-done"); - // Check that fut gets hedge response. - assert_eq!(fut.wait().unwrap(), "hedge-done"); + // Check that the hedge has been issued. + let hedge_req = assert_request_eq!(handle, "orig"); + hedge_req.send_response("hedge-done"); + // Check that fut gets hedge response. + assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "hedge-done"); + }); }); } #[test] fn completes_before_hedge() { - let (mut service, mut handle) = new_service(TestPolicy); + task::mock(|cx| { + clock::mock(|_| { + let (mut service, handle) = new_service(TestPolicy); + pin_mut!(handle); - mocked(|_, _| { - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call("orig"); - // Check that orig request has been issued. - let (_, req) = handle.next_request().expect("orig"); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call("orig"); + pin_mut!(fut); + // Check that orig request has been issued. + let req = assert_request_eq!(handle, "orig"); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); - req.send_response("orig-done"); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); - // Check that fut gets orig response. - assert_eq!(fut.wait().unwrap(), "orig-done"); + req.send_response("orig-done"); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); + // Check that fut gets orig response. + assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done"); + }); }); } #[test] fn request_not_retyable() { - let (mut service, mut handle) = new_service(TestPolicy); + task::mock(|cx| { + clock::mock(|time| { + let (mut service, handle) = new_service(TestPolicy); + pin_mut!(handle); - mocked(|timer, _| { - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call(NOT_RETRYABLE); - // Check that orig request has been issued. - let (_, req) = handle.next_request().expect("orig"); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call(NOT_RETRYABLE); + pin_mut!(fut); + // Check that orig request has been issued. + let req = assert_request_eq!(handle, NOT_RETRYABLE); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); - advance(timer, ms(10)); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); + time.advance(Duration::from_millis(10)); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); - req.send_response("orig-done"); - // Check that fut gets orig response. - assert_eq!(fut.wait().unwrap(), "orig-done"); + req.send_response("orig-done"); + // Check that fut gets orig response. + assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done"); + }); }); } #[test] fn request_not_clonable() { - let (mut service, mut handle) = new_service(TestPolicy); + task::mock(|cx| { + clock::mock(|time| { + let (mut service, handle) = new_service(TestPolicy); + pin_mut!(handle); - mocked(|timer, _| { - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call(NOT_CLONABLE); - // Check that orig request has been issued. - let (_, req) = handle.next_request().expect("orig"); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call(NOT_CLONABLE); + pin_mut!(fut); + // Check that orig request has been issued. + let req = assert_request_eq!(handle, NOT_CLONABLE); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); - advance(timer, ms(10)); - // Check fut is not ready. - assert!(fut.poll().unwrap().is_not_ready()); - // Check hedge has not been issued. - assert!(handle.poll_request().unwrap().is_not_ready()); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); + time.advance(Duration::from_millis(10)); + // Check fut is not ready. + assert_pending!(fut.as_mut().poll(cx)); + // Check hedge has not been issued. + assert_pending!(handle.as_mut().poll_request(cx)); - req.send_response("orig-done"); - // Check that fut gets orig response. - assert_eq!(fut.wait().unwrap(), "orig-done"); + req.send_response("orig-done"); + // Check that fut gets orig response. + assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done"); + }); }); } @@ -151,7 +162,7 @@ static NOT_CLONABLE: &'static str = "NOT_CLONABLE"; #[derive(Clone)] struct TestPolicy; -impl hedge::Policy for TestPolicy { +impl tower_hedge::Policy for TestPolicy { fn can_retry(&self, req: &Req) -> bool { *req != NOT_RETRYABLE } diff --git a/tower-hedge/tests/support/mod.rs b/tower-hedge/tests/support/mod.rs deleted file mode 100644 index 01955f8..0000000 --- a/tower-hedge/tests/support/mod.rs +++ /dev/null @@ -1,264 +0,0 @@ -// Shamelessly copied verbatim from -// https://github.com/tokio-rs/tokio/blob/master/tokio-timer/tests/support/mod.rs - -#![allow(unused_macros, unused_imports, dead_code, deprecated)] - -use tokio_executor::park::{Park, Unpark}; -use tokio_timer::clock::Now; -use tokio_timer::timer::Timer; - -use futures::future::{lazy, Future}; - -use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; - -macro_rules! assert_ready { - ($f:expr) => {{ - use ::futures::Async::*; - - match $f.poll().unwrap() { - Ready(v) => v, - NotReady => panic!("NotReady"), - } - }}; - ($f:expr, $($msg:expr),+) => {{ - use ::futures::Async::*; - - match $f.poll().unwrap() { - Ready(v) => v, - NotReady => { - let msg = format!($($msg),+); - panic!("NotReady; {}", msg) - } - } - }} -} - -macro_rules! assert_ready_eq { - ($f:expr, $expect:expr) => { - assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect)); - }; -} - -macro_rules! assert_not_ready { - ($f:expr) => {{ - let res = $f.poll().unwrap(); - assert!(!res.is_ready(), "actual={:?}", res) - }}; - ($f:expr, $($msg:expr),+) => {{ - let res = $f.poll().unwrap(); - if res.is_ready() { - let msg = format!($($msg),+); - panic!("actual={:?}; {}", res, msg); - } - }}; -} - -macro_rules! assert_elapsed { - ($f:expr) => { - assert!($f.poll().unwrap_err().is_elapsed()); - }; -} - -#[derive(Debug)] -pub struct MockTime { - inner: Inner, - _p: PhantomData>, -} - -#[derive(Debug)] -pub struct MockNow { - inner: Inner, -} - -#[derive(Debug)] -pub struct MockPark { - inner: Inner, - _p: PhantomData>, -} - -#[derive(Debug)] -pub struct MockUnpark { - inner: Inner, -} - -type Inner = Arc>; - -#[derive(Debug)] -struct State { - base: Instant, - advance: Duration, - unparked: bool, - park_for: Option, -} - -pub fn ms(num: u64) -> Duration { - Duration::from_millis(num) -} - -pub trait IntoTimeout { - fn into_timeout(self) -> Option; -} - -impl IntoTimeout for Option { - fn into_timeout(self) -> Self { - self - } -} - -impl IntoTimeout for Duration { - fn into_timeout(self) -> Option { - Some(self) - } -} - -/// Turn the timer state once -pub fn turn(timer: &mut Timer, duration: T) { - timer.turn(duration.into_timeout()).unwrap(); -} - -/// Advance the timer the specified amount -pub fn advance(timer: &mut Timer, duration: Duration) { - let inner = timer.get_park().inner.clone(); - let deadline = inner.lock().unwrap().now() + duration; - - while inner.lock().unwrap().now() < deadline { - let dur = deadline - inner.lock().unwrap().now(); - turn(timer, dur); - } -} - -pub fn mocked(f: F) -> R -where - F: FnOnce(&mut Timer, &mut MockTime) -> R, -{ - mocked_with_now(Instant::now(), f) -} - -pub fn mocked_with_now(now: Instant, f: F) -> R -where - F: FnOnce(&mut Timer, &mut MockTime) -> R, -{ - let mut time = MockTime::new(now); - let park = time.mock_park(); - let now = ::tokio_timer::clock::Clock::new_with_now(time.mock_now()); - - let mut enter = ::tokio_executor::enter().unwrap(); - - ::tokio_timer::clock::with_default(&now, &mut enter, |enter| { - let mut timer = Timer::new(park); - let handle = timer.handle(); - - ::tokio_timer::with_default(&handle, enter, |_| { - lazy(|| Ok::<_, ()>(f(&mut timer, &mut time))) - .wait() - .unwrap() - }) - }) -} - -impl MockTime { - pub fn new(now: Instant) -> MockTime { - let state = State { - base: now, - advance: Duration::default(), - unparked: false, - park_for: None, - }; - - MockTime { - inner: Arc::new(Mutex::new(state)), - _p: PhantomData, - } - } - - pub fn mock_now(&self) -> MockNow { - let inner = self.inner.clone(); - MockNow { inner } - } - - pub fn mock_park(&self) -> MockPark { - let inner = self.inner.clone(); - MockPark { - inner, - _p: PhantomData, - } - } - - pub fn now(&self) -> Instant { - self.inner.lock().unwrap().now() - } - - /// Returns the total amount of time the time has been advanced. - pub fn advanced(&self) -> Duration { - self.inner.lock().unwrap().advance - } - - pub fn advance(&self, duration: Duration) { - let mut inner = self.inner.lock().unwrap(); - inner.advance(duration); - } - - /// The next call to park_timeout will be for this duration, regardless of - /// the timeout passed to `park_timeout`. - pub fn park_for(&self, duration: Duration) { - self.inner.lock().unwrap().park_for = Some(duration); - } -} - -impl Park for MockPark { - type Unpark = MockUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - let inner = self.inner.clone(); - MockUnpark { inner } - } - - fn park(&mut self) -> Result<(), Self::Error> { - let mut inner = self.inner.lock().map_err(|_| ())?; - - let duration = inner.park_for.take().expect("call park_for first"); - - inner.advance(duration); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - let mut inner = self.inner.lock().unwrap(); - - if let Some(duration) = inner.park_for.take() { - inner.advance(duration); - } else { - inner.advance(duration); - } - - Ok(()) - } -} - -impl Unpark for MockUnpark { - fn unpark(&self) { - if let Ok(mut inner) = self.inner.lock() { - inner.unparked = true; - } - } -} - -impl Now for MockNow { - fn now(&self) -> Instant { - self.inner.lock().unwrap().now() - } -} - -impl State { - fn now(&self) -> Instant { - self.base + self.advance - } - - fn advance(&mut self, duration: Duration) { - self.advance += duration; - } -} diff --git a/tower-spawn-ready/Cargo.toml b/tower-spawn-ready/Cargo.toml index 6fa8c1a..2634654 100644 --- a/tower-spawn-ready/Cargo.toml +++ b/tower-spawn-ready/Cargo.toml @@ -28,7 +28,6 @@ futures-util-preview = "=0.3.0-alpha.18" pin-project = "=0.4.0-alpha.11" tower-service = "=0.3.0-alpha.1" tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" } -tower-util = { version = "=0.3.0-alpha.1", path = "../tower-util" } tokio-executor = "=0.2.0-alpha.4" tokio-sync = "=0.2.0-alpha.4" diff --git a/tower-spawn-ready/src/future.rs b/tower-spawn-ready/src/future.rs index 8ccc760..fb31641 100644 --- a/tower-spawn-ready/src/future.rs +++ b/tower-spawn-ready/src/future.rs @@ -3,6 +3,7 @@ use crate::error::Error; use futures_core::ready; use pin_project::pin_project; +use std::marker::PhantomData; use std::{ future::Future, pin::Pin, @@ -11,18 +12,13 @@ use std::{ use tokio_executor::TypedExecutor; use tokio_sync::oneshot; use tower_service::Service; -use tower_util::Ready; #[pin_project] /// Drives a service to readiness. -pub struct BackgroundReady -where - T: Service, - T::Error: Into, -{ - #[pin] - ready: Ready, +pub struct BackgroundReady { + service: Option, tx: Option>>, + _req: PhantomData, } /// This trait allows you to use either Tokio's threaded runtime's executor or @@ -55,8 +51,9 @@ where { let (tx, rx) = oneshot::channel(); let bg = BackgroundReady { - ready: Ready::new(service), + service: Some(service), tx: Some(tx), + _req: PhantomData, }; (bg, rx) } @@ -75,7 +72,9 @@ where return Poll::Ready(()); } - let result = ready!(this.ready.poll(cx)); + let result = ready!(this.service.as_mut().expect("illegal state").poll_ready(cx)) + .map(|()| this.service.take().expect("illegal state")); + let _ = this .tx .take() diff --git a/tower-util/src/ready.rs b/tower-util/src/ready.rs index 3ce57be..cc11628 100644 --- a/tower-util/src/ready.rs +++ b/tower-util/src/ready.rs @@ -13,42 +13,38 @@ use tower_service::Service; /// /// `Ready` values are produced by `ServiceExt::ready`. #[pin_project] -pub struct Ready { - inner: Option, +pub struct Ready<'a, T, Request> { + inner: &'a mut T, _p: PhantomData Request>, } -impl Ready +impl<'a, T, Request> Ready<'a, T, Request> where T: Service, { - pub fn new(service: T) -> Self { + pub fn new(service: &'a mut T) -> Self { Ready { - inner: Some(service), + inner: service, _p: PhantomData, } } } -impl Future for Ready +impl<'a, T, Request> Future for Ready<'a, T, Request> where T: Service, { - type Output = Result; + type Output = Result<(), T::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - ready!(this - .inner - .as_mut() - .expect("called `poll` after future completed") - .poll_ready(cx))?; + ready!(this.inner.poll_ready(cx))?; - Poll::Ready(Ok(this.inner.take().unwrap())) + Poll::Ready(Ok(())) } } -impl fmt::Debug for Ready +impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request> where T: fmt::Debug, { diff --git a/tower/src/util.rs b/tower/src/util.rs index 237188f..8eea103 100644 --- a/tower/src/util.rs +++ b/tower/src/util.rs @@ -14,7 +14,7 @@ type Error = Box; /// adapters pub trait ServiceExt: Service { /// A future yielding the service when it is ready to accept a request. - fn ready(self) -> Ready + fn ready(&mut self) -> Ready<'_, Self, Request> where Self: Sized, { diff --git a/tower/tests/builder.rs b/tower/tests/builder.rs index 5cbae48..cbb76e8 100644 --- a/tower/tests/builder.rs +++ b/tower/tests/builder.rs @@ -17,7 +17,7 @@ async fn builder_service() { pin_mut!(handle); let policy = MockPolicy; - let client = ServiceBuilder::new() + let mut client = ServiceBuilder::new() .layer(BufferLayer::new(5)) .layer(ConcurrencyLimitLayer::new(5)) .layer(RateLimitLayer::new(5, Duration::from_secs(1))) @@ -28,7 +28,7 @@ async fn builder_service() { // allow a request through handle.allow(1); - let mut client = client.ready().await.unwrap(); + client.ready().await.unwrap(); let fut = client.call("hello"); let (request, rsp) = poll_fn(|cx| handle.as_mut().poll_request(cx)) .await