Merge branch 'v0.3.x' of github.com:tower-rs/tower into v0.3.x

This commit is contained in:
Lucio Franco 2019-09-11 15:57:32 -04:00
commit 04fd0c5898
No known key found for this signature in database
GPG Key ID: 20DB2D1FCD10BDC1
16 changed files with 304 additions and 532 deletions

View File

@ -6,7 +6,7 @@ members = [
"tower-buffer", "tower-buffer",
"tower-discover", "tower-discover",
"tower-filter", "tower-filter",
# "tower-hedge", "tower-hedge",
"tower-layer", "tower-layer",
"tower-limit", "tower-limit",
"tower-load", "tower-load",

View File

@ -40,7 +40,6 @@ tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tower-load = { version = "=0.3.0-alpha.1", path = "../tower-load" } tower-load = { version = "=0.3.0-alpha.1", path = "../tower-load" }
tower-service = "=0.3.0-alpha.1" tower-service = "=0.3.0-alpha.1"
tower-make = { version = "=0.3.0-alpha.1", path = "../tower-make" } tower-make = { version = "=0.3.0-alpha.1", path = "../tower-make" }
tower-util = { version = "=0.3.0-alpha.1", path = "../tower-util" }
slab = "0.4" slab = "0.4"
[dev-dependencies] [dev-dependencies]

View File

@ -4,6 +4,7 @@ use futures_util::{stream, try_future, try_future::TryFutureExt};
use indexmap::IndexMap; use indexmap::IndexMap;
use pin_project::pin_project; use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy}; use rand::{rngs::SmallRng, FromEntropy};
use std::marker::PhantomData;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -13,7 +14,6 @@ use tokio_sync::oneshot;
use tower_discover::{Change, Discover}; use tower_discover::{Change, Discover};
use tower_load::Load; use tower_load::Load;
use tower_service::Service; use tower_service::Service;
use tower_util::Ready;
use tracing::{debug, trace}; use tracing::{debug, trace};
/// Distributes requests across inner services using the [Power of Two Choices][p2c]. /// Distributes requests across inner services using the [Power of Two Choices][p2c].
@ -50,6 +50,8 @@ pub struct Balance<D: Discover, Req> {
next_ready_index: Option<usize>, next_ready_index: Option<usize>,
rng: SmallRng, rng: SmallRng,
_req: PhantomData<Req>,
} }
#[pin_project] #[pin_project]
@ -61,8 +63,9 @@ struct UnreadyService<K, S, Req> {
key: Option<K>, key: Option<K>,
#[pin] #[pin]
cancel: oneshot::Receiver<()>, cancel: oneshot::Receiver<()>,
#[pin] service: Option<S>,
ready: tower_util::Ready<S, Req>,
_req: PhantomData<Req>,
} }
enum Error<E> { enum Error<E> {
@ -84,6 +87,8 @@ where
cancelations: IndexMap::default(), cancelations: IndexMap::default(),
unready_services: stream::FuturesUnordered::new(), unready_services: stream::FuturesUnordered::new(),
next_ready_index: None, next_ready_index: None,
_req: PhantomData,
} }
} }
@ -139,8 +144,9 @@ where
self.cancelations.insert(key.clone(), tx); self.cancelations.insert(key.clone(), tx);
self.unready_services.push(UnreadyService { self.unready_services.push(UnreadyService {
key: Some(key), key: Some(key),
ready: Ready::new(svc), service: Some(svc),
cancel: rx, cancel: rx,
_req: PhantomData,
}); });
} }
@ -342,15 +348,18 @@ impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
return Poll::Ready(Err((key, Error::Canceled))); return Poll::Ready(Err((key, Error::Canceled)));
} }
match ready!(this.ready.poll(cx)) { let res = ready!(this
Ok(svc) => { .service
let key = this.key.take().expect("polled after ready"); .as_mut()
Poll::Ready(Ok((key, svc))) .expect("poll after ready")
} .poll_ready(cx));
Err(e) => {
let key = this.key.take().expect("polled after ready"); let key = this.key.take().expect("polled after ready");
Poll::Ready(Err((key, Error::Inner(e)))) let svc = this.service.take().expect("polled after ready");
}
match res {
Ok(()) => Poll::Ready(Ok((key, svc))),
Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
} }
} }
} }

View File

@ -1,18 +1,20 @@
[package] [package]
name = "tower-hedge" name = "tower-hedge"
version = "0.1.0" version = "0.3.0-alpha.1"
authors = ["Alex Leong <adlleong@gmail.com>"] authors = ["Alex Leong <adlleong@gmail.com>"]
edition = "2018"
publish = false publish = false
[dependencies] [dependencies]
futures = "0.1"
hdrhistogram = "6.0" hdrhistogram = "6.0"
log = "0.4.1" log = "0.4.1"
tower-service = "0.2.0" tower-service = "0.3.0-alpha.1"
tower-filter = { version = "0.1", path = "../tower-filter" } tower-filter = { version = "0.3.0-alpha.1", path = "../tower-filter" }
tokio-mock-task = { git = "https://github.com/carllerche/tokio-mock-task" } tokio-timer = "0.3.0-alpha.4"
tokio-timer = "0.2.6" futures-util-preview = "0.3.0-alpha.18"
pin-project = "0.4.0-alpha.10"
[dev-dependencies] [dev-dependencies]
tower-test = { version = "0.1", path = "../tower-test" } tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-executor = "0.1.2" tokio-test = "0.2.0-alpha.4"
tokio-executor = "0.2.0-alpha.4"

View File

@ -1,7 +1,12 @@
use futures::{Async, Future, Poll}; use futures_util::ready;
use tower_service::Service; use pin_project::{pin_project, project};
use std::time::Duration; use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// A policy which specifies how long each request should be delayed for. /// A policy which specifies how long each request should be delayed for.
pub trait Policy<Request> { pub trait Policy<Request> {
@ -16,16 +21,19 @@ pub struct Delay<P, S> {
service: S, service: S,
} }
#[pin_project]
#[derive(Debug)] #[derive(Debug)]
pub struct ResponseFuture<Request, S, F> { pub struct ResponseFuture<Request, S, F> {
service: S, service: S,
#[pin]
state: State<Request, F>, state: State<Request, F>,
} }
#[pin_project]
#[derive(Debug)] #[derive(Debug)]
enum State<Request, F> { enum State<Request, F> {
Delaying(tokio_timer::Delay, Option<Request>), Delaying(#[pin] tokio_timer::Delay, Option<Request>),
Called(F), Called(#[pin] F),
} }
impl<P, S> Delay<P, S> { impl<P, S> Delay<P, S> {
@ -49,8 +57,8 @@ where
type Error = super::Error; type Error = super::Error;
type Future = ResponseFuture<Request, S, S::Future>; type Future = ResponseFuture<Request, S, S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready().map_err(|e| e.into()) self.service.poll_ready(cx).map_err(|e| e.into())
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {
@ -63,37 +71,36 @@ where
}; };
ResponseFuture { ResponseFuture {
service: orig, service: orig,
state: State::Delaying(tokio_timer::Delay::new(deadline), Some(request)), state: State::Delaying(tokio_timer::delay(deadline), Some(request)),
} }
} }
} }
impl<Request, S, F> Future for ResponseFuture<Request, S, F> impl<Request, S, F, T, E> Future for ResponseFuture<Request, S, F>
where where
F: Future, F: Future<Output = Result<T, E>>,
F::Error: Into<super::Error>, E: Into<super::Error>,
S: Service<Request, Future = F, Response = F::Item, Error = F::Error>, S: Service<Request, Future = F, Response = T, Error = E>,
{ {
type Item = F::Item; type Output = Result<T, super::Error>;
type Error = super::Error;
#[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
let next = match self.state { #[project]
State::Delaying(ref mut delay, ref mut req) => match delay.poll() { match this.state.project() {
Ok(Async::NotReady) => return Ok(Async::NotReady), State::Delaying(delay, req) => {
Ok(Async::Ready(())) => { ready!(delay.poll(cx));
let req = req.take().expect("Missing request in delay"); let req = req.take().expect("Missing request in delay");
let fut = self.service.call(req); let fut = this.service.call(req);
State::Called(fut) this.state.set(State::Called(fut));
} }
Err(e) => return Err(e.into()), State::Called(fut) => {
}, return fut.poll(cx).map_err(Into::into);
State::Called(ref mut fut) => {
return fut.poll().map_err(|e| e.into());
} }
}; };
self.state = next;
} }
} }
} }

View File

@ -1,9 +1,14 @@
use futures::{Async, Future, Poll}; use futures_util::ready;
use pin_project::pin_project;
use std::time::{Duration, Instant};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_timer::clock; use tokio_timer::clock;
use tower_service::Service; use tower_service::Service;
use std::time::{Duration, Instant};
/// Record is the interface for accepting request latency measurements. When /// Record is the interface for accepting request latency measurements. When
/// a request completes, record is called with the elapsed duration between /// a request completes, record is called with the elapsed duration between
/// when the service was called and when the future completed. /// when the service was called and when the future completed.
@ -19,10 +24,12 @@ pub struct Latency<R, S> {
service: S, service: S,
} }
#[pin_project]
#[derive(Debug)] #[derive(Debug)]
pub struct ResponseFuture<R, F> { pub struct ResponseFuture<R, F> {
start: Instant, start: Instant,
rec: R, rec: R,
#[pin]
inner: F, inner: F,
} }
@ -42,15 +49,15 @@ where
impl<S, R, Request> Service<Request> for Latency<R, S> impl<S, R, Request> Service<Request> for Latency<R, S>
where where
S: Service<Request>, S: Service<Request>,
S::Error: Into<super::Error>, super::Error: From<S::Error>,
R: Record + Clone, R: Record + Clone,
{ {
type Response = S::Response; type Response = S::Response;
type Error = super::Error; type Error = super::Error;
type Future = ResponseFuture<R, S::Future>; type Future = ResponseFuture<R, S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready().map_err(|e| e.into()) self.service.poll_ready(cx).map_err(|e| e.into())
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {
@ -62,24 +69,20 @@ where
} }
} }
impl<R, F> Future for ResponseFuture<R, F> impl<R, F, T, E> Future for ResponseFuture<R, F>
where where
R: Record, R: Record,
F: Future, F: Future<Output = Result<T, E>>,
F::Error: Into<super::Error>, super::Error: From<E>,
{ {
type Item = F::Item; type Output = Result<T, super::Error>;
type Error = super::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.poll() { let this = self.project();
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(rsp)) => { let rsp = ready!(this.inner.poll(cx))?;
let duration = clock::now() - self.start; let duration = clock::now() - *this.start;
self.rec.record(duration); this.rec.record(duration);
Ok(Async::Ready(rsp)) Poll::Ready(Ok(rsp))
}
Err(e) => Err(e.into()),
}
} }
} }

View File

@ -3,18 +3,16 @@
#![deny(warnings)] #![deny(warnings)]
#![deny(missing_docs)] #![deny(missing_docs)]
extern crate futures;
extern crate hdrhistogram;
#[macro_use]
extern crate log;
extern crate tokio_timer;
extern crate tower_filter;
extern crate tower_service;
use futures::future::FutureResult; use futures_util::future;
use futures::{future, Poll}; use log::error;
use pin_project::pin_project;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_filter::Filter; use tower_filter::Filter;
mod delay; mod delay;
@ -38,12 +36,16 @@ type Service<S, P> = select::Select<
/// future or the retry future completes, that value is used. /// future or the retry future completes, that value is used.
#[derive(Debug)] #[derive(Debug)]
pub struct Hedge<S, P>(Service<S, P>); pub struct Hedge<S, P>(Service<S, P>);
#[pin_project]
/// The Future returned by the hedge Service. /// The Future returned by the hedge Service.
pub struct Future<S, P, Request>(<Service<S, P> as tower_service::Service<Request>>::Future) pub struct Future<S, Request>
where where
S: tower_service::Service<Request> + Clone, S: tower_service::Service<Request>,
S::Error: Into<Error>, {
P: Policy<Request> + Clone; #[pin]
inner: S::Future,
}
type Error = Box<dyn std::error::Error + Send + Sync>; type Error = Box<dyn std::error::Error + Send + Sync>;
@ -58,15 +60,20 @@ pub trait Policy<Request> {
fn can_retry(&self, req: &Request) -> bool; fn can_retry(&self, req: &Request) -> bool;
} }
// NOTE: these are pub only because they appear inside a Future<F>
#[doc(hidden)]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct PolicyPredicate<P>(P); pub struct PolicyPredicate<P>(P);
#[doc(hidden)]
#[derive(Debug)] #[derive(Debug)]
struct DelayPolicy { pub struct DelayPolicy {
histo: Histo, histo: Histo,
latency_percentile: f32, latency_percentile: f32,
} }
#[doc(hidden)]
#[derive(Debug)] #[derive(Debug)]
struct SelectPolicy<P> { pub struct SelectPolicy<P> {
policy: P, policy: P,
histo: Histo, histo: Histo,
min_data_points: u64, min_data_points: u64,
@ -83,7 +90,7 @@ impl<S, P> Hedge<S, P> {
) -> Hedge<S, P> ) -> Hedge<S, P>
where where
S: tower_service::Service<Request> + Clone, S: tower_service::Service<Request> + Clone,
S::Error: Into<Error>, Error: From<S::Error>,
P: Policy<Request> + Clone, P: Policy<Request> + Clone,
{ {
let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
@ -102,7 +109,7 @@ impl<S, P> Hedge<S, P> {
) -> Hedge<S, P> ) -> Hedge<S, P>
where where
S: tower_service::Service<Request> + Clone, S: tower_service::Service<Request> + Clone,
S::Error: Into<Error>, Error: From<S::Error>,
P: Policy<Request> + Clone, P: Policy<Request> + Clone,
{ {
let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
@ -124,7 +131,7 @@ impl<S, P> Hedge<S, P> {
) -> Hedge<S, P> ) -> Hedge<S, P>
where where
S: tower_service::Service<Request> + Clone, S: tower_service::Service<Request> + Clone,
S::Error: Into<Error>, Error: From<S::Error>,
P: Policy<Request> + Clone, P: Policy<Request> + Clone,
{ {
// Clone the underlying service and wrap both copies in a middleware that // Clone the underlying service and wrap both copies in a middleware that
@ -157,33 +164,33 @@ impl<S, P> Hedge<S, P> {
impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P> impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P>
where where
S: tower_service::Service<Request> + Clone, S: tower_service::Service<Request> + Clone,
S::Error: Into<Error>, Error: From<S::Error>,
P: Policy<Request> + Clone, P: Policy<Request> + Clone,
{ {
type Response = S::Response; type Response = S::Response;
type Error = Error; type Error = Error;
type Future = Future<S, P, Request>; type Future = Future<Service<S, P>, Request>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready() self.0.poll_ready(cx)
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {
Future(self.0.call(request)) Future {
inner: self.0.call(request),
}
} }
} }
impl<S, P, Request> futures::Future for Future<S, P, Request> impl<S, Request> std::future::Future for Future<S, Request>
where where
S: tower_service::Service<Request> + Clone, S: tower_service::Service<Request>,
S::Error: Into<Error>, Error: From<S::Error>,
P: Policy<Request> + Clone,
{ {
type Item = S::Response; type Output = Result<S::Response, Error>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll() self.project().inner.poll(cx).map_err(Into::into)
} }
} }
@ -213,19 +220,19 @@ where
P: Policy<Request>, P: Policy<Request>,
{ {
type Future = future::Either< type Future = future::Either<
FutureResult<(), tower_filter::error::Error>, future::Ready<Result<(), tower_filter::error::Error>>,
future::Empty<(), tower_filter::error::Error>, future::Pending<Result<(), tower_filter::error::Error>>,
>; >;
fn check(&mut self, request: &Request) -> Self::Future { fn check(&mut self, request: &Request) -> Self::Future {
if self.0.can_retry(request) { if self.0.can_retry(request) {
future::Either::A(future::ok(())) future::Either::Left(future::ready(Ok(())))
} else { } else {
// If the hedge retry should not be issued, we simply want to wait // If the hedge retry should not be issued, we simply want to wait
// for the result of the original request. Therefore we don't want // for the result of the original request. Therefore we don't want
// to return an error here. Instead, we use future::empty to ensure // to return an error here. Instead, we use future::pending to ensure
// that the original request wins the select. // that the original request wins the select.
future::Either::B(future::empty()) future::Either::Right(future::pending())
} }
} }
} }

View File

@ -1,8 +1,8 @@
extern crate tokio_timer; extern crate tokio_timer;
use hdrhistogram::Histogram; use hdrhistogram::Histogram;
use log::trace;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio_timer::clock; use tokio_timer::clock;
/// This represents a "rotating" histogram which stores two histogram, one which /// This represents a "rotating" histogram which stores two histogram, one which

View File

@ -1,4 +1,9 @@
use futures::{Async, Future, Poll}; use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service; use tower_service::Service;
/// A policy which decides which requests can be cloned and sent to the B /// A policy which decides which requests can be cloned and sent to the B
@ -18,9 +23,12 @@ pub struct Select<P, A, B> {
b: B, b: B,
} }
#[pin_project]
#[derive(Debug)] #[derive(Debug)]
pub struct ResponseFuture<AF, BF> { pub struct ResponseFuture<AF, BF> {
#[pin]
a_fut: AF, a_fut: AF,
#[pin]
b_fut: Option<BF>, b_fut: Option<BF>,
} }
@ -41,21 +49,20 @@ impl<P, A, B, Request> Service<Request> for Select<P, A, B>
where where
P: Policy<Request>, P: Policy<Request>,
A: Service<Request>, A: Service<Request>,
A::Error: Into<super::Error>, super::Error: From<A::Error>,
B: Service<Request, Response = A::Response>, B: Service<Request, Response = A::Response>,
B::Error: Into<super::Error>, super::Error: From<B::Error>,
{ {
type Response = A::Response; type Response = A::Response;
type Error = super::Error; type Error = super::Error;
type Future = ResponseFuture<A::Future, B::Future>; type Future = ResponseFuture<A::Future, B::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let a = self.a.poll_ready().map_err(|e| e.into())?; match (self.a.poll_ready(cx), self.b.poll_ready(cx)) {
let b = self.b.poll_ready().map_err(|e| e.into())?; (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
if a.is_ready() && b.is_ready() { (Poll::Ready(Err(e)), _) => Poll::Ready(Err(e.into())),
Ok(Async::Ready(())) (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e.into())),
} else { _ => Poll::Pending,
Ok(Async::NotReady)
} }
} }
@ -72,29 +79,26 @@ where
} }
} }
impl<AF, BF> Future for ResponseFuture<AF, BF> impl<AF, BF, T, AE, BE> Future for ResponseFuture<AF, BF>
where where
AF: Future, AF: Future<Output = Result<T, AE>>,
AF::Error: Into<super::Error>, super::Error: From<AE>,
BF: Future<Item = AF::Item>, BF: Future<Output = Result<T, BE>>,
BF::Error: Into<super::Error>, super::Error: From<BE>,
{ {
type Item = AF::Item; type Output = Result<T, super::Error>;
type Error = super::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.a_fut.poll() { let this = self.project();
Ok(Async::NotReady) => {}
Ok(Async::Ready(a)) => return Ok(Async::Ready(a)), if let Poll::Ready(r) = this.a_fut.poll(cx) {
Err(e) => return Err(e.into()), return Poll::Ready(Ok(r?));
} }
if let Some(ref mut b_fut) = self.b_fut { if let Some(b_fut) = this.b_fut.as_pin_mut() {
match b_fut.poll() { if let Poll::Ready(r) = b_fut.poll(cx) {
Ok(Async::NotReady) => {} return Poll::Ready(Ok(r?));
Ok(Async::Ready(b)) => return Ok(Async::Ready(b)),
Err(e) => return Err(e.into()),
} }
} }
return Ok(Async::NotReady); return Poll::Pending;
} }
} }

View File

@ -1,142 +1,153 @@
extern crate futures; use futures_util::pin_mut;
extern crate tokio_executor; use std::future::Future;
extern crate tokio_mock_task;
extern crate tokio_timer;
extern crate tower_hedge as hedge;
extern crate tower_service;
extern crate tower_test;
#[macro_use]
mod support;
use support::*;
use futures::Future;
use hedge::{Hedge, Policy};
use tower_service::Service;
use std::time::Duration; use std::time::Duration;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task};
use tower_hedge::{Hedge, Policy};
use tower_service::Service;
use tower_test::assert_request_eq;
#[test] #[test]
fn hedge_orig_completes_first() { fn hedge_orig_completes_first() {
let (mut service, mut handle) = new_service(TestPolicy); task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
mocked(|timer, _| { assert_ready_ok!(service.poll_ready(cx));
assert!(service.poll_ready().unwrap().is_ready()); let fut = service.call("orig");
let mut fut = service.call("orig"); pin_mut!(fut);
// Check that orig request has been issued.
let (_, req) = handle.next_request().expect("orig");
// Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready());
// Check hedge has not been issued. // Check that orig request has been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); let req = assert_request_eq!(handle, "orig");
advance(timer, ms(10)); // Check fut is not ready.
// Check fut is not ready. assert_pending!(fut.as_mut().poll(cx));
assert!(fut.poll().unwrap().is_not_ready());
// Check that the hedge has been issued.
let (_, _hedge_req) = handle.next_request().expect("hedge");
req.send_response("orig-done"); // Check hedge has not been issued.
// Check that fut gets orig response. assert_pending!(handle.as_mut().poll_request(cx));
assert_eq!(fut.wait().unwrap(), "orig-done"); time.advance(Duration::from_millis(10));
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check that the hedge has been issued.
let _hedge_req = assert_request_eq!(handle, "orig");
req.send_response("orig-done");
// Check that fut gets orig response.
assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
}); });
} }
#[test] #[test]
fn hedge_hedge_completes_first() { fn hedge_hedge_completes_first() {
let (mut service, mut handle) = new_service(TestPolicy); task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
mocked(|timer, _| { assert_ready_ok!(service.poll_ready(cx));
assert!(service.poll_ready().unwrap().is_ready()); let fut = service.call("orig");
let mut fut = service.call("orig"); pin_mut!(fut);
// Check that orig request has been issued. // Check that orig request has been issued.
let (_, _req) = handle.next_request().expect("orig"); let _req = assert_request_eq!(handle, "orig");
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued. // Check hedge has not been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); assert_pending!(handle.as_mut().poll_request(cx));
advance(timer, ms(10)); time.advance(Duration::from_millis(10));
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
// Check that the hedge has been issued. // Check that the hedge has been issued.
let (_, hedge_req) = handle.next_request().expect("hedge"); let hedge_req = assert_request_eq!(handle, "orig");
hedge_req.send_response("hedge-done"); hedge_req.send_response("hedge-done");
// Check that fut gets hedge response. // Check that fut gets hedge response.
assert_eq!(fut.wait().unwrap(), "hedge-done"); assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "hedge-done");
});
}); });
} }
#[test] #[test]
fn completes_before_hedge() { fn completes_before_hedge() {
let (mut service, mut handle) = new_service(TestPolicy); task::mock(|cx| {
clock::mock(|_| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
mocked(|_, _| { assert_ready_ok!(service.poll_ready(cx));
assert!(service.poll_ready().unwrap().is_ready()); let fut = service.call("orig");
let mut fut = service.call("orig"); pin_mut!(fut);
// Check that orig request has been issued. // Check that orig request has been issued.
let (_, req) = handle.next_request().expect("orig"); let req = assert_request_eq!(handle, "orig");
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
req.send_response("orig-done"); req.send_response("orig-done");
// Check hedge has not been issued. // Check hedge has not been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); assert_pending!(handle.as_mut().poll_request(cx));
// Check that fut gets orig response. // Check that fut gets orig response.
assert_eq!(fut.wait().unwrap(), "orig-done"); assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
}); });
} }
#[test] #[test]
fn request_not_retyable() { fn request_not_retyable() {
let (mut service, mut handle) = new_service(TestPolicy); task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
mocked(|timer, _| { assert_ready_ok!(service.poll_ready(cx));
assert!(service.poll_ready().unwrap().is_ready()); let fut = service.call(NOT_RETRYABLE);
let mut fut = service.call(NOT_RETRYABLE); pin_mut!(fut);
// Check that orig request has been issued. // Check that orig request has been issued.
let (_, req) = handle.next_request().expect("orig"); let req = assert_request_eq!(handle, NOT_RETRYABLE);
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued. // Check hedge has not been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); assert_pending!(handle.as_mut().poll_request(cx));
advance(timer, ms(10)); time.advance(Duration::from_millis(10));
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued. // Check hedge has not been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); assert_pending!(handle.as_mut().poll_request(cx));
req.send_response("orig-done"); req.send_response("orig-done");
// Check that fut gets orig response. // Check that fut gets orig response.
assert_eq!(fut.wait().unwrap(), "orig-done"); assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
}); });
} }
#[test] #[test]
fn request_not_clonable() { fn request_not_clonable() {
let (mut service, mut handle) = new_service(TestPolicy); task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
mocked(|timer, _| { assert_ready_ok!(service.poll_ready(cx));
assert!(service.poll_ready().unwrap().is_ready()); let fut = service.call(NOT_CLONABLE);
let mut fut = service.call(NOT_CLONABLE); pin_mut!(fut);
// Check that orig request has been issued. // Check that orig request has been issued.
let (_, req) = handle.next_request().expect("orig"); let req = assert_request_eq!(handle, NOT_CLONABLE);
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued. // Check hedge has not been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); assert_pending!(handle.as_mut().poll_request(cx));
advance(timer, ms(10)); time.advance(Duration::from_millis(10));
// Check fut is not ready. // Check fut is not ready.
assert!(fut.poll().unwrap().is_not_ready()); assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued. // Check hedge has not been issued.
assert!(handle.poll_request().unwrap().is_not_ready()); assert_pending!(handle.as_mut().poll_request(cx));
req.send_response("orig-done"); req.send_response("orig-done");
// Check that fut gets orig response. // Check that fut gets orig response.
assert_eq!(fut.wait().unwrap(), "orig-done"); assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
}); });
} }
@ -151,7 +162,7 @@ static NOT_CLONABLE: &'static str = "NOT_CLONABLE";
#[derive(Clone)] #[derive(Clone)]
struct TestPolicy; struct TestPolicy;
impl hedge::Policy<Req> for TestPolicy { impl tower_hedge::Policy<Req> for TestPolicy {
fn can_retry(&self, req: &Req) -> bool { fn can_retry(&self, req: &Req) -> bool {
*req != NOT_RETRYABLE *req != NOT_RETRYABLE
} }

View File

@ -1,264 +0,0 @@
// Shamelessly copied verbatim from
// https://github.com/tokio-rs/tokio/blob/master/tokio-timer/tests/support/mod.rs
#![allow(unused_macros, unused_imports, dead_code, deprecated)]
use tokio_executor::park::{Park, Unpark};
use tokio_timer::clock::Now;
use tokio_timer::timer::Timer;
use futures::future::{lazy, Future};
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
macro_rules! assert_ready {
($f:expr) => {{
use ::futures::Async::*;
match $f.poll().unwrap() {
Ready(v) => v,
NotReady => panic!("NotReady"),
}
}};
($f:expr, $($msg:expr),+) => {{
use ::futures::Async::*;
match $f.poll().unwrap() {
Ready(v) => v,
NotReady => {
let msg = format!($($msg),+);
panic!("NotReady; {}", msg)
}
}
}}
}
macro_rules! assert_ready_eq {
($f:expr, $expect:expr) => {
assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect));
};
}
macro_rules! assert_not_ready {
($f:expr) => {{
let res = $f.poll().unwrap();
assert!(!res.is_ready(), "actual={:?}", res)
}};
($f:expr, $($msg:expr),+) => {{
let res = $f.poll().unwrap();
if res.is_ready() {
let msg = format!($($msg),+);
panic!("actual={:?}; {}", res, msg);
}
}};
}
macro_rules! assert_elapsed {
($f:expr) => {
assert!($f.poll().unwrap_err().is_elapsed());
};
}
#[derive(Debug)]
pub struct MockTime {
inner: Inner,
_p: PhantomData<Rc<()>>,
}
#[derive(Debug)]
pub struct MockNow {
inner: Inner,
}
#[derive(Debug)]
pub struct MockPark {
inner: Inner,
_p: PhantomData<Rc<()>>,
}
#[derive(Debug)]
pub struct MockUnpark {
inner: Inner,
}
type Inner = Arc<Mutex<State>>;
#[derive(Debug)]
struct State {
base: Instant,
advance: Duration,
unparked: bool,
park_for: Option<Duration>,
}
pub fn ms(num: u64) -> Duration {
Duration::from_millis(num)
}
pub trait IntoTimeout {
fn into_timeout(self) -> Option<Duration>;
}
impl IntoTimeout for Option<Duration> {
fn into_timeout(self) -> Self {
self
}
}
impl IntoTimeout for Duration {
fn into_timeout(self) -> Option<Duration> {
Some(self)
}
}
/// Turn the timer state once
pub fn turn<T: IntoTimeout>(timer: &mut Timer<MockPark>, duration: T) {
timer.turn(duration.into_timeout()).unwrap();
}
/// Advance the timer the specified amount
pub fn advance(timer: &mut Timer<MockPark>, duration: Duration) {
let inner = timer.get_park().inner.clone();
let deadline = inner.lock().unwrap().now() + duration;
while inner.lock().unwrap().now() < deadline {
let dur = deadline - inner.lock().unwrap().now();
turn(timer, dur);
}
}
pub fn mocked<F, R>(f: F) -> R
where
F: FnOnce(&mut Timer<MockPark>, &mut MockTime) -> R,
{
mocked_with_now(Instant::now(), f)
}
pub fn mocked_with_now<F, R>(now: Instant, f: F) -> R
where
F: FnOnce(&mut Timer<MockPark>, &mut MockTime) -> R,
{
let mut time = MockTime::new(now);
let park = time.mock_park();
let now = ::tokio_timer::clock::Clock::new_with_now(time.mock_now());
let mut enter = ::tokio_executor::enter().unwrap();
::tokio_timer::clock::with_default(&now, &mut enter, |enter| {
let mut timer = Timer::new(park);
let handle = timer.handle();
::tokio_timer::with_default(&handle, enter, |_| {
lazy(|| Ok::<_, ()>(f(&mut timer, &mut time)))
.wait()
.unwrap()
})
})
}
impl MockTime {
pub fn new(now: Instant) -> MockTime {
let state = State {
base: now,
advance: Duration::default(),
unparked: false,
park_for: None,
};
MockTime {
inner: Arc::new(Mutex::new(state)),
_p: PhantomData,
}
}
pub fn mock_now(&self) -> MockNow {
let inner = self.inner.clone();
MockNow { inner }
}
pub fn mock_park(&self) -> MockPark {
let inner = self.inner.clone();
MockPark {
inner,
_p: PhantomData,
}
}
pub fn now(&self) -> Instant {
self.inner.lock().unwrap().now()
}
/// Returns the total amount of time the time has been advanced.
pub fn advanced(&self) -> Duration {
self.inner.lock().unwrap().advance
}
pub fn advance(&self, duration: Duration) {
let mut inner = self.inner.lock().unwrap();
inner.advance(duration);
}
/// The next call to park_timeout will be for this duration, regardless of
/// the timeout passed to `park_timeout`.
pub fn park_for(&self, duration: Duration) {
self.inner.lock().unwrap().park_for = Some(duration);
}
}
impl Park for MockPark {
type Unpark = MockUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
let inner = self.inner.clone();
MockUnpark { inner }
}
fn park(&mut self) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().map_err(|_| ())?;
let duration = inner.park_for.take().expect("call park_for first");
inner.advance(duration);
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
if let Some(duration) = inner.park_for.take() {
inner.advance(duration);
} else {
inner.advance(duration);
}
Ok(())
}
}
impl Unpark for MockUnpark {
fn unpark(&self) {
if let Ok(mut inner) = self.inner.lock() {
inner.unparked = true;
}
}
}
impl Now for MockNow {
fn now(&self) -> Instant {
self.inner.lock().unwrap().now()
}
}
impl State {
fn now(&self) -> Instant {
self.base + self.advance
}
fn advance(&mut self, duration: Duration) {
self.advance += duration;
}
}

View File

@ -28,7 +28,6 @@ futures-util-preview = "=0.3.0-alpha.18"
pin-project = "=0.4.0-alpha.11" pin-project = "=0.4.0-alpha.11"
tower-service = "=0.3.0-alpha.1" tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" } tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tower-util = { version = "=0.3.0-alpha.1", path = "../tower-util" }
tokio-executor = "=0.2.0-alpha.4" tokio-executor = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4" tokio-sync = "=0.2.0-alpha.4"

View File

@ -3,6 +3,7 @@
use crate::error::Error; use crate::error::Error;
use futures_core::ready; use futures_core::ready;
use pin_project::pin_project; use pin_project::pin_project;
use std::marker::PhantomData;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -11,18 +12,13 @@ use std::{
use tokio_executor::TypedExecutor; use tokio_executor::TypedExecutor;
use tokio_sync::oneshot; use tokio_sync::oneshot;
use tower_service::Service; use tower_service::Service;
use tower_util::Ready;
#[pin_project] #[pin_project]
/// Drives a service to readiness. /// Drives a service to readiness.
pub struct BackgroundReady<T, Request> pub struct BackgroundReady<T, Request> {
where service: Option<T>,
T: Service<Request>,
T::Error: Into<Error>,
{
#[pin]
ready: Ready<T, Request>,
tx: Option<oneshot::Sender<Result<T, Error>>>, tx: Option<oneshot::Sender<Result<T, Error>>>,
_req: PhantomData<Request>,
} }
/// This trait allows you to use either Tokio's threaded runtime's executor or /// This trait allows you to use either Tokio's threaded runtime's executor or
@ -55,8 +51,9 @@ where
{ {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let bg = BackgroundReady { let bg = BackgroundReady {
ready: Ready::new(service), service: Some(service),
tx: Some(tx), tx: Some(tx),
_req: PhantomData,
}; };
(bg, rx) (bg, rx)
} }
@ -75,7 +72,9 @@ where
return Poll::Ready(()); return Poll::Ready(());
} }
let result = ready!(this.ready.poll(cx)); let result = ready!(this.service.as_mut().expect("illegal state").poll_ready(cx))
.map(|()| this.service.take().expect("illegal state"));
let _ = this let _ = this
.tx .tx
.take() .take()

View File

@ -13,42 +13,38 @@ use tower_service::Service;
/// ///
/// `Ready` values are produced by `ServiceExt::ready`. /// `Ready` values are produced by `ServiceExt::ready`.
#[pin_project] #[pin_project]
pub struct Ready<T, Request> { pub struct Ready<'a, T, Request> {
inner: Option<T>, inner: &'a mut T,
_p: PhantomData<fn() -> Request>, _p: PhantomData<fn() -> Request>,
} }
impl<T, Request> Ready<T, Request> impl<'a, T, Request> Ready<'a, T, Request>
where where
T: Service<Request>, T: Service<Request>,
{ {
pub fn new(service: T) -> Self { pub fn new(service: &'a mut T) -> Self {
Ready { Ready {
inner: Some(service), inner: service,
_p: PhantomData, _p: PhantomData,
} }
} }
} }
impl<T, Request> Future for Ready<T, Request> impl<'a, T, Request> Future for Ready<'a, T, Request>
where where
T: Service<Request>, T: Service<Request>,
{ {
type Output = Result<T, T::Error>; type Output = Result<(), T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
ready!(this ready!(this.inner.poll_ready(cx))?;
.inner
.as_mut()
.expect("called `poll` after future completed")
.poll_ready(cx))?;
Poll::Ready(Ok(this.inner.take().unwrap())) Poll::Ready(Ok(()))
} }
} }
impl<T, Request> fmt::Debug for Ready<T, Request> impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request>
where where
T: fmt::Debug, T: fmt::Debug,
{ {

View File

@ -14,7 +14,7 @@ type Error = Box<dyn std::error::Error + Send + Sync>;
/// adapters /// adapters
pub trait ServiceExt<Request>: Service<Request> { pub trait ServiceExt<Request>: Service<Request> {
/// A future yielding the service when it is ready to accept a request. /// A future yielding the service when it is ready to accept a request.
fn ready(self) -> Ready<Self, Request> fn ready(&mut self) -> Ready<'_, Self, Request>
where where
Self: Sized, Self: Sized,
{ {

View File

@ -17,7 +17,7 @@ async fn builder_service() {
pin_mut!(handle); pin_mut!(handle);
let policy = MockPolicy; let policy = MockPolicy;
let client = ServiceBuilder::new() let mut client = ServiceBuilder::new()
.layer(BufferLayer::new(5)) .layer(BufferLayer::new(5))
.layer(ConcurrencyLimitLayer::new(5)) .layer(ConcurrencyLimitLayer::new(5))
.layer(RateLimitLayer::new(5, Duration::from_secs(1))) .layer(RateLimitLayer::new(5, Duration::from_secs(1)))
@ -28,7 +28,7 @@ async fn builder_service() {
// allow a request through // allow a request through
handle.allow(1); handle.allow(1);
let mut client = client.ready().await.unwrap(); client.ready().await.unwrap();
let fut = client.call("hello"); let fut = client.call("hello");
let (request, rsp) = poll_fn(|cx| handle.as_mut().poll_request(cx)) let (request, rsp) = poll_fn(|cx| handle.as_mut().poll_request(cx))
.await .await