Update tower-retry to std::future (#326)

This bumps tower-retry to 0.3.0-alpha.1
This commit is contained in:
Jon Gjengset 2019-09-09 15:10:46 -04:00 committed by GitHub
parent 154bd69b9f
commit 4f71951221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 159 additions and 155 deletions

View File

@ -12,7 +12,7 @@ members = [
"tower-load",
"tower-load-shed",
# "tower-reconnect",
# "tower-retry",
"tower-retry",
"tower-service",
# "tower-spawn-ready",
"tower-test",

View File

@ -1,3 +1,7 @@
# 0.3.0-alpha.1
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -8,13 +8,13 @@ name = "tower-retry"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-retry/0.1.0"
documentation = "https://docs.rs/tower-retry/0.3.0-alpha.1"
description = """
Retry failed requests.
"""
@ -22,11 +22,13 @@ categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures = "0.1.26"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tokio-timer = "0.2.4"
tower-service = "0.3.0-alpha.1"
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
tokio-timer = "0.3.0-alpha.4"
pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] }
futures-core-preview = "0.3.0-alpha.18"
[dev-dependencies]
tower-test = { version = "0.1.0", path = "../tower-test" }
tokio-executor = "0.1.2"
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "0.2.0-alpha.4"
futures-util-preview = "0.3.0-alpha.18"

View File

@ -213,13 +213,8 @@ impl Bucket {
#[cfg(test)]
mod tests {
use self::tokio_executor::enter;
use super::*;
use std::{
sync::{Arc, Mutex, MutexGuard},
time::Instant,
};
use tokio_executor;
use tokio_test::clock;
#[test]
fn empty() {
@ -229,13 +224,11 @@ mod tests {
#[test]
fn leaky() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|time| {
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
bgt.deposit();
*time.as_mut() += Duration::from_secs(3);
time.advance(Duration::from_secs(3));
bgt.withdraw().unwrap_err();
});
@ -243,23 +236,21 @@ mod tests {
#[test]
fn slots() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|time| {
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
bgt.deposit();
bgt.deposit();
*time.as_mut() += Duration::from_millis(900);
time.advance(Duration::from_millis(900));
// 900ms later, the deposit should still be valid
bgt.withdraw().unwrap();
// blank slate
*time.as_mut() += Duration::from_millis(2000);
time.advance(Duration::from_millis(2000));
bgt.deposit();
*time.as_mut() += Duration::from_millis(300);
time.advance(Duration::from_millis(300));
bgt.deposit();
*time.as_mut() += Duration::from_millis(800);
time.advance(Duration::from_millis(800));
bgt.deposit();
// the first deposit is expired, but the 2nd should still be valid,
@ -270,9 +261,7 @@ mod tests {
#[test]
fn reserve() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|_| {
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
@ -283,19 +272,4 @@ mod tests {
bgt.withdraw().unwrap_err();
});
}
#[derive(Clone)]
struct MockNow(Arc<Mutex<Instant>>);
impl MockNow {
fn as_mut(&self) -> MutexGuard<Instant> {
self.0.lock().unwrap()
}
}
impl clock::Now for MockNow {
fn now(&self) -> Instant {
*self.0.lock().expect("now")
}
}
}

View File

@ -1,10 +1,19 @@
//! Future types
use crate::{Policy, Retry};
use futures::{try_ready, Async, Future, Poll};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower_service::Service;
// NOTE: this is the trait generated for Ready::project() by pin-project.
// We need it here to be able to go "through" Ready to &mut Service without adding Unpin bounds.
use crate::__RetryProjectionTrait;
/// The `Future` returned by a `Retry` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
where
@ -12,16 +21,19 @@ where
S: Service<Request>,
{
request: Option<Request>,
#[pin]
retry: Retry<P, S>,
state: State<S::Future, P::Future, S::Response, S::Error>,
#[pin]
state: State<S::Future, P::Future>,
}
#[pin_project]
#[derive(Debug)]
enum State<F, P, R, E> {
enum State<F, P> {
/// Polling the future from `Service::call`
Called(F),
Called(#[pin] F),
/// Polling the future from `Policy::retry`
Checking(P, Option<Result<R, E>>),
Checking(#[pin] P),
/// Polling `Service::poll_ready` after `Checking` was OK.
Retrying,
}
@ -49,56 +61,55 @@ where
P: Policy<Request, S::Response, S::Error> + Clone,
S: Service<Request> + Clone,
{
type Item = S::Response;
type Error = S::Error;
type Output = Result<S::Response, S::Error>;
#[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.state {
State::Called(ref mut future) => {
let result = match future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(res)) => Ok(res),
Err(err) => Err(err),
};
if let Some(ref req) = self.request {
match self.retry.policy.retry(req, result.as_ref()) {
Some(checking) => State::Checking(checking, Some(result)),
None => return result.map(Async::Ready),
#[project]
match this.state.project() {
State::Called(future) => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
Some(checking) => {
this.state.set(State::Checking(checking));
}
None => return Poll::Ready(result),
}
} else {
// request wasn't cloned, so no way to retry it
return result.map(Async::Ready);
return Poll::Ready(result);
}
}
State::Checking(ref mut future, ref mut result) => {
let policy = match future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(policy)) => policy,
Err(()) => {
// if Policy::retry() fails, return the original
// result...
return result
.take()
.expect("polled after complete")
.map(Async::Ready);
}
};
self.retry.policy = policy;
State::Retrying
State::Checking(future) => {
this.retry.project().policy.set(ready!(future.poll(cx)));
this.state.set(State::Retrying);
}
State::Retrying => {
try_ready!(self.retry.poll_ready());
let req = self
// NOTE: we assume here that
//
// this.retry.poll_ready()
//
// is equivalent to
//
// this.retry.service.poll_ready()
//
// we need to make that assumption to avoid adding an Unpin bound to the Policy
// in Ready to make it Unpin so that we can get &mut Ready as needed to call
// poll_ready on it.
ready!(this.retry.project().service.poll_ready(cx))?;
let req = this
.request
.take()
.expect("retrying requires cloned request");
self.request = self.retry.policy.clone_request(&req);
State::Called(self.retry.service.call(req))
*this.request = this.retry.policy.clone_request(&req);
this.state
.set(State::Called(this.retry.project().service.call(req)));
}
};
self.state = next;
}
}
}
}

View File

@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-retry/0.1.0")]
#![doc(html_root_url = "https://docs.rs/tower-retry/0.3.0-alpha.1")]
#![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![cfg_attr(test, deny(warnings))]
#![allow(elided_lifetimes_in_paths)]
@ -14,14 +14,17 @@ pub use crate::layer::RetryLayer;
pub use crate::policy::Policy;
use crate::future::ResponseFuture;
use futures::Poll;
use pin_project::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;
/// Configure retrying requests of "failed" responses.
///
/// A `Policy` classifies what is a "failed" response.
#[pin_project]
#[derive(Clone, Debug)]
pub struct Retry<P, S> {
#[pin]
policy: P,
service: S,
}
@ -44,8 +47,11 @@ where
type Error = S::Error;
type Future = ResponseFuture<P, S, Request>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// NOTE: the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is
// equivalent to Ready.service.poll_ready. If this ever changes, that code must be updated
// as well.
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {

View File

@ -1,14 +1,12 @@
use futures::Future;
use std::future::Future;
/// A "retry policy" to classify if a request should be retried.
///
/// # Example
///
/// ```
/// extern crate futures;
/// extern crate tower_retry;
///
/// use tower_retry::Policy;
/// use futures_util::future;
///
/// type Req = String;
/// type Res = String;
@ -16,7 +14,7 @@ use futures::Future;
/// struct Attempts(usize);
///
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = futures::future::FutureResult<Self, ()>;
/// type Future = future::Ready<Self>;
///
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
/// match result {
@ -30,7 +28,7 @@ use futures::Future;
/// // But we limit the number of attempts...
/// if self.0 > 0 {
/// // Try again!
/// Some(futures::future::ok(Attempts(self.0 - 1)))
/// Some(future::ready(Attempts(self.0 - 1)))
/// } else {
/// // Used all our attempts, no retry...
/// None
@ -46,7 +44,7 @@ use futures::Future;
/// ```
pub trait Policy<Req, Res, E>: Sized {
/// The `Future` type returned by `Policy::retry()`.
type Future: Future<Item = Self, Error = ()>;
type Future: Future<Output = Self>;
/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
@ -56,9 +54,6 @@ pub trait Policy<Req, Res, E>: Sized {
///
/// If the request *should* be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
///
/// If the returned `Future` errors, the request will **not** be retried
/// after all.
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
/// Tries to clone a request before being passed to the inner service.
///

View File

@ -1,77 +1,101 @@
use futures::{future, Future};
use futures_util::{future, pin_mut};
use std::future::Future;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
use tower_retry::Policy;
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn retry_errors() {
let (mut service, mut handle) = new_service(RetryErrors);
task::mock(|cx| {
let (mut service, handle) = new_service(RetryErrors);
pin_mut!(handle);
assert!(service.poll_ready().unwrap().is_ready());
let mut fut = service.call("hello");
assert_ready_ok!(service.poll_ready(cx));
assert_request_eq!(handle, "hello").send_error("retry me");
let fut = service.call("hello");
pin_mut!(fut);
assert_not_ready(&mut fut);
assert_request_eq!(handle.as_mut(), "hello").send_error("retry me");
assert_request_eq!(handle, "hello").send_response("world");
assert_pending!(fut.as_mut().poll(cx));
assert_eq!(fut.wait().unwrap(), "world");
assert_request_eq!(handle.as_mut(), "hello").send_response("world");
assert_ready_ok!(fut.poll(cx), "world");
});
}
#[test]
fn retry_limit() {
let (mut service, mut handle) = new_service(Limit(2));
task::mock(|cx| {
let (mut service, handle) = new_service(Limit(2));
pin_mut!(handle);
assert!(service.poll_ready().unwrap().is_ready());
let mut fut = service.call("hello");
assert_ready_ok!(service.poll_ready(cx));
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_not_ready(&mut fut);
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry 2");
assert_not_ready(&mut fut);
assert_request_eq!(handle.as_mut(), "hello").send_error("retry 1");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_error("retry 3");
assert_eq!(fut.wait().unwrap_err().to_string(), "retry 3");
assert_request_eq!(handle.as_mut(), "hello").send_error("retry 2");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle.as_mut(), "hello").send_error("retry 3");
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 3");
});
}
#[test]
fn retry_error_inspection() {
let (mut service, mut handle) = new_service(UnlessErr("reject"));
task::mock(|cx| {
let (mut service, handle) = new_service(UnlessErr("reject"));
pin_mut!(handle);
assert!(service.poll_ready().unwrap().is_ready());
let mut fut = service.call("hello");
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_not_ready(&mut fut);
assert_request_eq!(handle.as_mut(), "hello").send_error("retry 1");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_error("reject");
assert_eq!(fut.wait().unwrap_err().to_string(), "reject");
assert_request_eq!(handle.as_mut(), "hello").send_error("reject");
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "reject");
});
}
#[test]
fn retry_cannot_clone_request() {
let (mut service, mut handle) = new_service(CannotClone);
task::mock(|cx| {
let (mut service, handle) = new_service(CannotClone);
pin_mut!(handle);
assert!(service.poll_ready().unwrap().is_ready());
let fut = service.call("hello");
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_eq!(fut.wait().unwrap_err().to_string(), "retry 1");
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 1");
});
}
#[test]
fn success_with_cannot_clone() {
// Even though the request couldn't be cloned, if the first request succeeds,
// it should succeed overall.
let (mut service, mut handle) = new_service(CannotClone);
task::mock(|cx| {
// Even though the request couldn't be cloned, if the first request succeeds,
// it should succeed overall.
let (mut service, handle) = new_service(CannotClone);
pin_mut!(handle);
assert!(service.poll_ready().unwrap().is_ready());
let fut = service.call("hello");
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(fut.wait().unwrap(), "world");
assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(fut.poll(cx), "world");
});
}
type Req = &'static str;
@ -85,10 +109,10 @@ type Handle = mock::Handle<Req, Res>;
struct RetryErrors;
impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::FutureResult<Self, ()>;
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ok(RetryErrors))
Some(future::ready(RetryErrors))
} else {
None
}
@ -103,10 +127,10 @@ impl Policy<Req, Res, Error> for RetryErrors {
struct Limit(usize);
impl Policy<Req, Res, Error> for Limit {
type Future = future::FutureResult<Self, ()>;
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
if result.is_err() && self.0 > 0 {
Some(future::ok(Limit(self.0 - 1)))
Some(future::ready(Limit(self.0 - 1)))
} else {
None
}
@ -121,11 +145,11 @@ impl Policy<Req, Res, Error> for Limit {
struct UnlessErr(InnerError);
impl Policy<Req, Res, Error> for UnlessErr {
type Future = future::FutureResult<Self, ()>;
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
result.err().and_then(|err| {
if err.to_string() != self.0 {
Some(future::ok(self.clone()))
Some(future::ready(self.clone()))
} else {
None
}
@ -141,7 +165,7 @@ impl Policy<Req, Res, Error> for UnlessErr {
struct CannotClone;
impl Policy<Req, Res, Error> for CannotClone {
type Future = future::FutureResult<Self, ()>;
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future> {
unreachable!("retry cannot be called since request isn't cloned");
}
@ -158,15 +182,3 @@ fn new_service<P: Policy<Req, Res, Error> + Clone>(
let service = tower_retry::Retry::new(policy, service);
(service, handle)
}
fn assert_not_ready<F: Future>(f: &mut F)
where
F::Error: ::std::fmt::Debug,
{
future::poll_fn(|| {
assert!(f.poll().unwrap().is_not_ready());
Ok::<_, ()>(().into())
})
.wait()
.unwrap();
}