Update tower-buffer to std::future (#323)

This bumps tower-buffer to 0.3.0-alpha.1
This commit is contained in:
Jon Gjengset 2019-09-09 12:07:28 -04:00 committed by GitHub
parent f8097a60f6
commit 693965fa4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 173 deletions

View File

@ -3,7 +3,7 @@
members = [
# "tower",
# "tower-balance",
# "tower-buffer",
"tower-buffer",
"tower-discover",
# "tower-filter",
# "tower-hedge",

View File

@ -1,3 +1,7 @@
# 0.3.0-alpha.1
- Move to `std::future`
# 0.1.1 (July 19, 2019)
- Add `tracing` support

View File

@ -8,13 +8,13 @@ name = "tower-buffer"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.1"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-buffer/0.1.1"
documentation = "https://docs.rs/tower-buffer/0.3.0-alpha.1"
description = """
Buffer requests before dispatching to a `Service`.
"""
@ -26,13 +26,15 @@ log = ["tracing/log"]
default = ["log"]
[dependencies]
futures = "0.1.25"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tokio-executor = "0.1.7"
tokio-sync = "0.1.0"
futures-core-preview = "0.3.0-alpha.18"
pin-project = { version = "0.4.0-alpha.9", features = ["project_attr"] }
tower-service = "0.3.0-alpha.1"
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
tokio-executor = "0.2.0-alpha.4"
tokio-sync = "0.2.0-alpha.4"
tracing = "0.1.2"
[dev-dependencies]
tower = { version = "0.1.0", path = "../tower" }
tower-test = { version = "0.1.0", path = "../tower-test" }
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-test = { version = "0.2.0-alpha.4" }
futures-util-preview = "0.3.0-alpha.18"

View File

@ -4,24 +4,29 @@ use crate::{
error::{Closed, Error},
message,
};
use futures::{Async, Future, Poll};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future eventually completed with the response to the original request.
#[pin_project]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}
#[pin_project]
enum ResponseState<T> {
Failed(Option<Error>),
Rx(message::Rx<T>),
Poll(T),
Rx(#[pin] message::Rx<T>),
Poll(#[pin] T),
}
impl<T> ResponseFuture<T>
where
T: Future,
T::Error: Into<Error>,
{
impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx(rx),
@ -35,36 +40,32 @@ where
}
}
impl<T> Future for ResponseFuture<T>
impl<F, T, E> Future for ResponseFuture<F>
where
T: Future,
T::Error: Into<Error>,
F: Future<Output = Result<T, E>>,
E: Into<Error>,
{
type Item = T::Item;
type Error = Error;
type Output = Result<T, Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use self::ResponseState::*;
#[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let fut;
match self.state {
Failed(ref mut e) => {
return Err(e.take().expect("polled after error"));
#[project]
match this.state.project() {
ResponseState::Failed(e) => {
return Poll::Ready(Err(e.take().expect("polled after error")));
}
Rx(ref mut rx) => match rx.poll() {
Ok(Async::Ready(Ok(f))) => fut = f,
Ok(Async::Ready(Err(e))) => return Err(e.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => return Err(Closed::new().into()),
ResponseState::Rx(rx) => match ready!(rx.poll(cx)) {
Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
Ok(Err(e)) => return Poll::Ready(Err(e.into())),
Err(_) => return Poll::Ready(Err(Closed::new().into())),
},
Poll(ref mut fut) => {
return fut.poll().map_err(Into::into);
ResponseState::Poll(fut) => {
return fut.poll(cx).map_err(Into::into)
}
}
self.state = Poll(fut);
}
}
}

View File

@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-buffer/0.1.1")]
#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0-alpha.1")]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]

View File

@ -5,7 +5,8 @@ use crate::{
worker::{Handle, Worker, WorkerExecutor},
};
use futures::Poll;
use futures_core::ready;
use std::task::{Context, Poll};
use tokio_executor::DefaultExecutor;
use tokio_sync::{mpsc, oneshot};
use tower_service::Service;
@ -81,9 +82,13 @@ where
type Error = Error;
type Future = ResponseFuture<T::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If the inner service has errored, then we error here.
self.tx.poll_ready().map_err(|_| self.get_worker_error())
if let Err(_) = ready!(self.tx.poll_ready(cx)) {
Poll::Ready(Err(self.get_worker_error()))
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, request: Request) -> Self::Future {

View File

@ -2,8 +2,14 @@ use crate::{
error::{Closed, Error, ServiceError},
message::Message,
};
use futures::{try_ready, Async, Future, Poll, Stream};
use futures_core::{ready, stream::Stream};
use pin_project::pin_project;
use std::sync::{Arc, Mutex};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_executor::TypedExecutor;
use tokio_sync::mpsc;
use tower_service::Service;
@ -15,6 +21,7 @@ use tower_service::Service;
/// as part of the public API. This is the "sealed" pattern to include "private"
/// types in public traits that are not meant for consumers of the library to
/// implement (only call).
#[pin_project]
pub struct Worker<T, Request>
where
T: Service<Request>,
@ -85,35 +92,39 @@ where
///
/// If a `Message` is returned, the `bool` is true if this is the first time we received this
/// message, and false otherwise (i.e., we tried to forward it to the backing service before).
fn poll_next_msg(&mut self) -> Poll<Option<(Message<Request, T::Future>, bool)>, ()> {
fn poll_next_msg(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<(Message<Request, T::Future>, bool)>> {
if self.finish {
// We've already received None and are shutting down
return Ok(Async::Ready(None));
return Poll::Ready(None);
}
tracing::trace!("worker polling for next message");
if let Some(mut msg) = self.current_message.take() {
// poll_cancel returns Async::Ready is the receiver is dropped.
// Returning NotReady means it is still alive, so we should still
// poll_closed returns Poll::Ready is the receiver is dropped.
// Returning Pending means it is still alive, so we should still
// use it.
if msg.tx.poll_close()?.is_not_ready() {
if msg.tx.poll_closed(cx).is_pending() {
tracing::trace!("resuming buffered request");
return Ok(Async::Ready(Some((msg, false))));
return Poll::Ready(Some((msg, false)));
}
tracing::trace!("dropping cancelled buffered request");
}
// Get the next request
while let Some(mut msg) = try_ready!(self.rx.poll().map_err(|_| ())) {
if msg.tx.poll_close()?.is_not_ready() {
while let Some(mut msg) = ready!(Pin::new(&mut self.rx).poll_next(cx)) {
if msg.tx.poll_closed(cx).is_pending() {
tracing::trace!("processing new request");
return Ok(Async::Ready(Some((msg, true))));
return Poll::Ready(Some((msg, true)));
}
// Otherwise, request is canceled, so pop the next one.
tracing::trace!("dropping cancelled request");
}
Ok(Async::Ready(None))
Poll::Ready(None)
}
fn failed(&mut self, error: Error) {
@ -155,16 +166,15 @@ where
T: Service<Request>,
T::Error: Into<Error>,
{
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<(), ()> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.finish {
return Ok(().into());
return Poll::Ready(());
}
loop {
match try_ready!(self.poll_next_msg()) {
match ready!(self.poll_next_msg(cx)) {
Some((msg, first)) => {
let _guard = msg.span.enter();
if let Some(ref failed) = self.failed {
@ -178,8 +188,8 @@ where
resumed = !first,
message = "worker received request; waiting for service readiness"
);
match self.service.poll_ready() {
Ok(Async::Ready(())) => {
match self.service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
tracing::debug!(service.ready = true, message = "processing request");
let response = self.service.call(msg.request);
@ -190,14 +200,14 @@ where
tracing::trace!("returning response future");
let _ = msg.tx.send(Ok(response));
}
Ok(Async::NotReady) => {
Poll::Pending => {
tracing::trace!(service.ready = false, message = "delay");
// Put out current message back in its slot.
drop(_guard);
self.current_message = Some(msg);
return Ok(Async::NotReady);
return Poll::Pending;
}
Err(e) => {
Poll::Ready(Err(e)) => {
let error = e.into();
tracing::debug!({ %error }, "service failed");
drop(_guard);
@ -213,7 +223,7 @@ where
None => {
// No more more requests _ever_.
self.finish = true;
return Ok(Async::Ready(()));
return Poll::Ready(());
}
}
}

View File

@ -1,94 +1,104 @@
use futures::prelude::*;
use futures_util::pin_mut;
use std::future::Future;
use std::{cell::RefCell, thread};
use tokio_executor::{SpawnError, TypedExecutor};
use tower::{
buffer::{error, Buffer},
Service,
};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
use tower_buffer::{error, Buffer};
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn req_and_res() {
let (mut service, mut handle) = new_service();
task::mock(|cx| {
let (mut service, handle) = new_service();
let response = service.call("hello");
let response = service.call("hello");
pin_mut!(response);
pin_mut!(handle);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(response.wait().unwrap(), "world");
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(assert_ready_ok!(response.as_mut().poll(cx)), "world");
});
}
#[test]
fn clears_canceled_requests() {
let (mut service, mut handle) = new_service();
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
handle.allow(1);
handle.allow(1);
let res1 = service.call("hello");
let res1 = service.call("hello");
pin_mut!(res1);
let send_response1 = assert_request_eq!(handle, "hello");
let send_response1 = assert_request_eq!(handle.as_mut(), "hello");
// don't respond yet, new requests will get buffered
// don't respond yet, new requests will get buffered
let res2 = service.call("hello2");
with_task(|| {
assert!(handle.poll_request().unwrap().is_not_ready());
let res2 = service.call("hello2");
assert_pending!(handle.as_mut().poll_request(cx));
let res3 = service.call("hello3");
pin_mut!(res3);
drop(res2);
send_response1.send_response("world");
assert_eq!(assert_ready_ok!(res1.poll(cx)), "world");
// res2 was dropped, so it should have been canceled in the buffer
handle.allow(1);
assert_request_eq!(handle, "hello3").send_response("world3");
assert_eq!(assert_ready_ok!(res3.poll(cx)), "world3");
});
let res3 = service.call("hello3");
drop(res2);
send_response1.send_response("world");
assert_eq!(res1.wait().unwrap(), "world");
// res2 was dropped, so it should have been canceled in the buffer
handle.allow(1);
assert_request_eq!(handle, "hello3").send_response("world3");
assert_eq!(res3.wait().unwrap(), "world3");
}
#[test]
fn when_inner_is_not_ready() {
let (mut service, mut handle) = new_service();
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
// Make the service NotReady
handle.allow(0);
// Make the service NotReady
handle.allow(0);
let mut res1 = service.call("hello");
let res1 = service.call("hello");
pin_mut!(res1);
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
with_task(|| {
assert!(res1.poll().expect("res1.poll").is_not_ready());
assert!(handle.poll_request().expect("poll_request").is_not_ready());
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
assert_pending!(res1.as_mut().poll(cx));
assert_pending!(handle.as_mut().poll_request(cx));
handle.allow(1);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(assert_ready_ok!(res1.poll(cx)), "world");
});
handle.allow(1);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(res1.wait().expect("res1.wait"), "world");
}
#[test]
fn when_inner_fails() {
use std::error::Error as StdError;
task::mock(|cx| {
use std::error::Error as StdError;
let (mut service, mut handle) = new_service();
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
let mut res1 = service.call("hello");
let res1 = service.call("hello");
pin_mut!(res1);
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
with_task(|| {
let e = res1.poll().unwrap_err();
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
let e = assert_ready_err!(res1.poll(cx));
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
let e = e.source().unwrap();
@ -101,68 +111,67 @@ fn when_inner_fails() {
#[test]
fn when_spawn_fails() {
let (service, _handle) = mock::pair::<(), ()>();
task::mock(|cx| {
let (service, _handle) = mock::pair::<(), ()>();
let mut exec = ExecFn(|_| Err(()));
let mut exec = ExecFn(|_| Err(()));
let mut service = Buffer::with_executor(service, 1, &mut exec);
let mut service = Buffer::with_executor(service, 1, &mut exec);
let err = with_task(|| {
service
.poll_ready()
.expect_err("buffer poll_ready should error")
});
let err = assert_ready_err!(service.poll_ready(cx));
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
})
}
#[test]
fn poll_ready_when_worker_is_dropped_early() {
let (service, _handle) = mock::pair::<(), ()>();
task::mock(|cx| {
let (service, _handle) = mock::pair::<(), ()>();
// drop that worker right on the floor!
let mut exec = ExecFn(|fut| {
drop(fut);
Ok(())
// drop that worker right on the floor!
let mut exec = ExecFn(|fut| {
drop(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec);
let err = assert_ready_err!(service.poll_ready(cx));
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
});
let mut service = Buffer::with_executor(service, 1, &mut exec);
let err = with_task(|| {
service
.poll_ready()
.expect_err("buffer poll_ready should error")
});
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
}
#[test]
fn response_future_when_worker_is_dropped_early() {
let (service, mut handle) = mock::pair::<_, ()>();
task::mock(|cx| {
let (service, mut handle) = mock::pair::<_, ()>();
// hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None);
let mut exec = ExecFn(|fut| {
*cell.borrow_mut() = Some(fut);
Ok(())
});
// hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None);
let mut exec = ExecFn(|fut| {
*cell.borrow_mut() = Some(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec);
let mut service = Buffer::with_executor(service, 1, &mut exec);
// keep the request in the worker
handle.allow(0);
let response = service.call("hello");
// keep the request in the worker
handle.allow(0);
let response = service.call("hello");
pin_mut!(response);
// drop the worker (like an executor closing up)
cell.borrow_mut().take();
// drop the worker (like an executor closing up)
cell.borrow_mut().take();
let err = response.wait().expect_err("res.wait");
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
let err = assert_ready_err!(response.poll(cx));
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
})
}
type Mock = mock::Mock<&'static str, &'static str>;
@ -172,11 +181,13 @@ struct Exec;
impl<F> TypedExecutor<F> for Exec
where
F: Future<Item = (), Error = ()> + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
thread::spawn(move || {
fut.wait().unwrap();
let mut mock = tokio_test::task::MockTask::new();
pin_mut!(fut);
while mock.poll(fut.as_mut()).is_pending() {}
});
Ok(())
}
@ -187,7 +198,7 @@ struct ExecFn<Func>(Func);
impl<Func, F> TypedExecutor<F> for ExecFn<Func>
where
Func: Fn(F) -> Result<(), ()>,
F: Future<Item = (), Error = ()> + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
(self.0)(fut).map_err(|()| SpawnError::shutdown())
@ -200,8 +211,3 @@ fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let service = Buffer::with_executor(service, 10, &mut Exec);
(service, handle)
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}