Update buffer and prepare for release (#377)

* Update buffer and prepare for release

* Update tower-buffer/src/service.rs

Co-Authored-By: Eliza Weisman <eliza@buoyant.io>

* fmt
This commit is contained in:
Lucio Franco 2019-12-04 20:31:27 -05:00 committed by GitHub
parent 15c58e8842
commit 54dd475ec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 166 additions and 292 deletions

View File

@ -3,7 +3,7 @@
members = [
# "tower",
# "tower-balance",
# "tower-buffer",
"tower-buffer",
# "tower-discover",
# "tower-filter",
# "tower-hedge",

View File

@ -1,3 +1,10 @@
# 0.3.0 (December 4, 2019)
- Update to `tokio 0.2`
- Update to `futures-core 0.3`
- Update to `tower-service 0.3`
- Update to `tower-layer 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`

View File

@ -8,7 +8,7 @@ name = "tower-buffer"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.2"
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
@ -26,15 +26,14 @@ log = ["tracing/log"]
default = ["log"]
[dependencies]
futures-core-preview = "=0.3.0-alpha.19"
futures-core = "0.3"
pin-project = "0.4"
tower-service = { version = "=0.3.0-alpha.2", path = "../tower-service" }
tower-layer = { version = "=0.3.0-alpha.2", path = "../tower-layer" }
tokio-executor = "=0.2.0-alpha.6"
tokio-sync = "=0.2.0-alpha.6"
tower-service = { version = "0.3", path = "../tower-service" }
tower-layer = { version = "0.3", path = "../tower-layer" }
tokio = { version = "0.2", features = ["sync"] }
tracing = "0.1.2"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.2", path = "../tower-test" }
tokio-test = { version = "=0.2.0-alpha.6" }
futures-util-preview = "=0.3.0-alpha.19"
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = { version = "0.2" }
tokio = { version = "0.2", features = ["macros"] }

View File

@ -14,12 +14,6 @@ pub struct Closed {
_p: (),
}
/// Error produced when spawning the worker fails
#[derive(Debug)]
pub struct SpawnError {
_p: (),
}
/// Errors produced by `Buffer`.
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
@ -66,19 +60,3 @@ impl fmt::Display for Closed {
}
impl std::error::Error for Closed {}
// ===== impl SpawnError =====
impl SpawnError {
pub(crate) fn new() -> SpawnError {
SpawnError { _p: () }
}
}
impl fmt::Display for SpawnError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "failed to spawn Buffer worker task")
}
}
impl std::error::Error for SpawnError {}

View File

@ -1,56 +1,39 @@
use crate::{error::Error, service::Buffer, worker::WorkerExecutor};
use crate::{error::Error, service::Buffer};
use std::{fmt, marker::PhantomData};
use tokio_executor::DefaultExecutor;
use tower_layer::Layer;
use tower_service::Service;
/// Buffer requests with a bounded buffer
pub struct BufferLayer<Request, E = DefaultExecutor> {
pub struct BufferLayer<Request> {
bound: usize,
executor: E,
_p: PhantomData<fn(Request)>,
}
impl<Request> BufferLayer<Request, DefaultExecutor> {
#[allow(missing_docs)]
impl<Request> BufferLayer<Request> {
/// Create a new `BufferLayer` with the provided `bound`.
pub fn new(bound: usize) -> Self {
BufferLayer {
bound,
executor: DefaultExecutor::current(),
_p: PhantomData,
}
}
}
impl<Request, E: Clone> BufferLayer<Request, E> {
/// Create a new buffered service layer spawned on the given executor.
pub fn with_executor(bound: usize, executor: E) -> Self {
BufferLayer {
bound,
executor,
_p: PhantomData,
}
}
}
impl<E, S, Request> Layer<S> for BufferLayer<Request, E>
impl<S, Request> Layer<S> for BufferLayer<Request>
where
S: Service<Request>,
S::Error: Into<Error>,
E: WorkerExecutor<S, Request> + Clone,
S: Service<Request> + Send + 'static,
S::Future: Send,
S::Error: Into<Error> + Send + Sync,
Request: Send + 'static,
{
type Service = Buffer<S, Request>;
fn layer(&self, service: S) -> Self::Service {
Buffer::with_executor(service, self.bound, &mut self.executor.clone())
Buffer::new(service, self.bound)
}
}
impl<Request, E> fmt::Debug for BufferLayer<Request, E>
where
// Require E: Debug in case we want to print the executor at a later date
E: fmt::Debug,
{
impl<Request> fmt::Debug for BufferLayer<Request> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer")
.field("bound", &self.bound)

View File

@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0-alpha.2")]
#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
@ -23,4 +23,3 @@ mod worker;
pub use crate::layer::BufferLayer;
pub use crate::service::Buffer;
pub use crate::worker::WorkerExecutor;

View File

@ -1,5 +1,5 @@
use crate::error::ServiceError;
use tokio_sync::oneshot;
use tokio::sync::oneshot;
/// Message sent over buffer
#[derive(Debug)]

View File

@ -1,14 +1,13 @@
use crate::{
error::{Error, SpawnError},
error::Error,
future::ResponseFuture,
message::Message,
worker::{Handle, Worker, WorkerExecutor},
worker::{Handle, Worker},
};
use futures_core::ready;
use std::task::{Context, Poll};
use tokio_executor::DefaultExecutor;
use tokio_sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot};
use tower_service::Service;
/// Adds a buffer in front of an inner service.
@ -20,7 +19,7 @@ where
T: Service<Request>,
{
tx: mpsc::Sender<Message<Request, T::Future>>,
worker: Option<Handle>,
handle: Handle,
}
impl<T, Request> Buffer<T, Request>
@ -42,35 +41,30 @@ where
T::Error: Send + Sync,
Request: Send + 'static,
{
Self::with_executor(service, bound, &mut DefaultExecutor::current())
let (tx, rx) = mpsc::channel(bound);
let (handle, worker) = Worker::new(service, rx);
tokio::spawn(worker);
Buffer { tx, handle }
}
/// Creates a new `Buffer` wrapping `service`.
/// Creates a new `Buffer` wrapping `service` but returns the background worker.
///
/// `executor` is used to spawn a new `Worker` task that is dedicated to
/// draining the buffer and dispatching the requests to the internal
/// service.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn with_executor<E>(service: T, bound: usize, executor: &mut E) -> Self
/// This is useful if you do not want to spawn directly onto the `tokio` runtime
/// but instead want to use your own executor. This will return the `Buffer` and
/// the background `Worker` that you can then spawn.
pub fn pair(service: T, bound: usize) -> (Buffer<T, Request>, Worker<T, Request>)
where
E: WorkerExecutor<T, Request>,
T: Send + 'static,
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::channel(bound);
let worker = Worker::spawn(service, rx, executor);
Buffer { tx, worker }
let (handle, worker) = Worker::new(service, rx);
(Buffer { tx, handle }, worker)
}
fn get_worker_error(&self) -> Error {
self.worker
.as_ref()
.map(|w| w.get_error_on_closed())
.unwrap_or_else(|| {
// If there's no worker handle, that's because spawning it
// at the beginning failed.
SpawnError::new().into()
})
self.handle.get_error_on_closed()
}
}
@ -105,19 +99,18 @@ where
let span = tracing::Span::current();
tracing::trace!(parent: &span, "sending request to buffer worker");
match self.tx.try_send(Message { request, span, tx }) {
Err(e) => {
if e.is_closed() {
ResponseFuture::failed(self.get_worker_error())
} else {
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender`
// handles may not send a message using that slot. This
// guarantees capacity for `request`.
//
// Given this, the only way to hit this code path is if
// `poll_ready` has not been called & `Ready` returned.
panic!("buffer full; poll_ready must be called first");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
ResponseFuture::failed(self.get_worker_error())
}
Err(mpsc::error::TrySendError::Full(_)) => {
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender`
// handles may not send a message using that slot. This
// guarantees capacity for `request`.
//
// Given this, the only way to hit this code path is if
// `poll_ready` has not been called & `Ready` returned.
panic!("buffer full; poll_ready must be called first");
}
Ok(_) => ResponseFuture::new(rx),
}
@ -131,7 +124,7 @@ where
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
worker: self.worker.clone(),
handle: self.handle.clone(),
}
}
}

View File

@ -10,8 +10,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio_executor::TypedExecutor;
use tokio_sync::mpsc;
use tokio::sync::mpsc;
use tower_service::Service;
/// Task that handles processing the buffer. This type should not be used
@ -42,35 +41,15 @@ pub(crate) struct Handle {
inner: Arc<Mutex<Option<ServiceError>>>,
}
/// This trait allows you to use either Tokio's threaded runtime's executor or the `current_thread`
/// runtime's executor depending on if `T` is `Send` or `!Send`.
pub trait WorkerExecutor<T, Request>: TypedExecutor<Worker<T, Request>>
where
T: Service<Request>,
T::Error: Into<Error>,
{
}
impl<T, Request, E: TypedExecutor<Worker<T, Request>>> WorkerExecutor<T, Request> for E
where
T: Service<Request>,
T::Error: Into<Error>,
{
}
impl<T, Request> Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
pub(crate) fn spawn<E>(
pub(crate) fn new(
service: T,
rx: mpsc::Receiver<Message<Request, T::Future>>,
executor: &mut E,
) -> Option<Handle>
where
E: WorkerExecutor<T, Request>,
{
) -> (Handle, Worker<T, Request>) {
let handle = Handle {
inner: Arc::new(Mutex::new(None)),
};
@ -84,10 +63,7 @@ where
handle: handle.clone(),
};
match executor.spawn(worker) {
Ok(()) => Some(handle),
Err(_) => None,
}
(handle, worker)
}
/// Return the next queued Message that hasn't been canceled.

View File

@ -1,223 +1,150 @@
use futures_util::pin_mut;
use std::future::Future;
use std::{cell::RefCell, thread};
use tokio_executor::{SpawnError, TypedExecutor};
use std::thread;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
use tower_buffer::{error, Buffer};
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
fn let_worker_work() {
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
thread::sleep(::std::time::Duration::from_millis(100));
}
#[test]
fn req_and_res() {
task::mock(|cx| {
let (mut service, handle) = new_service();
#[tokio::test]
async fn req_and_res() {
let (mut service, mut handle) = new_service();
let response = service.call("hello");
pin_mut!(response);
pin_mut!(handle);
let mut response = task::spawn(service.call("hello"));
assert_request_eq!(handle, "hello").send_response("world");
assert_request_eq!(handle, "hello").send_response("world");
let_worker_work();
assert_eq!(assert_ready_ok!(response.as_mut().poll(cx)), "world");
});
let_worker_work();
assert_eq!(assert_ready_ok!(response.poll()), "world");
}
#[test]
fn clears_canceled_requests() {
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
#[tokio::test]
async fn clears_canceled_requests() {
let (mut service, mut handle) = new_service();
handle.allow(1);
handle.allow(1);
let res1 = service.call("hello");
pin_mut!(res1);
let mut res1 = task::spawn(service.call("hello"));
let send_response1 = assert_request_eq!(handle, "hello");
let send_response1 = assert_request_eq!(handle, "hello");
// don't respond yet, new requests will get buffered
// don't respond yet, new requests will get buffered
let res2 = service.call("hello2");
let res2 = task::spawn(service.call("hello2"));
assert_pending!(handle.as_mut().poll_request(cx));
assert_pending!(handle.poll_request());
let res3 = service.call("hello3");
pin_mut!(res3);
let mut res3 = task::spawn(service.call("hello3"));
drop(res2);
drop(res2);
send_response1.send_response("world");
send_response1.send_response("world");
let_worker_work();
assert_eq!(assert_ready_ok!(res1.poll(cx)), "world");
let_worker_work();
assert_eq!(assert_ready_ok!(res1.poll()), "world");
// res2 was dropped, so it should have been canceled in the buffer
handle.allow(1);
// res2 was dropped, so it should have been canceled in the buffer
handle.allow(1);
assert_request_eq!(handle, "hello3").send_response("world3");
assert_request_eq!(handle, "hello3").send_response("world3");
let_worker_work();
assert_eq!(assert_ready_ok!(res3.poll(cx)), "world3");
});
let_worker_work();
assert_eq!(assert_ready_ok!(res3.poll()), "world3");
}
#[test]
fn when_inner_is_not_ready() {
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
#[tokio::test]
async fn when_inner_is_not_ready() {
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
// Make the service NotReady
handle.allow(0);
let res1 = service.call("hello");
pin_mut!(res1);
let mut res1 = task::spawn(service.call("hello"));
let_worker_work();
assert_pending!(res1.as_mut().poll(cx));
assert_pending!(handle.as_mut().poll_request(cx));
let_worker_work();
assert_pending!(res1.poll());
assert_pending!(handle.poll_request());
handle.allow(1);
handle.allow(1);
assert_request_eq!(handle, "hello").send_response("world");
assert_request_eq!(handle, "hello").send_response("world");
let_worker_work();
assert_eq!(assert_ready_ok!(res1.poll(cx)), "world");
});
let_worker_work();
assert_eq!(assert_ready_ok!(res1.poll()), "world");
}
#[test]
fn when_inner_fails() {
task::mock(|cx| {
use std::error::Error as StdError;
#[tokio::test]
async fn when_inner_fails() {
use std::error::Error as StdError;
let (mut service, mut handle) = new_service();
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
let res1 = service.call("hello");
pin_mut!(res1);
let mut res1 = task::spawn(service.call("hello"));
let_worker_work();
let e = assert_ready_err!(res1.poll(cx));
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
let e = e.source().unwrap();
let_worker_work();
let e = assert_ready_err!(res1.poll());
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
let e = e.source().unwrap();
assert_eq!(e.to_string(), "foobar");
} else {
panic!("unexpected error type: {:?}", e);
}
});
assert_eq!(e.to_string(), "foobar");
} else {
panic!("unexpected error type: {:?}", e);
}
}
#[test]
fn when_spawn_fails() {
task::mock(|cx| {
let (service, _handle) = mock::pair::<(), ()>();
#[tokio::test]
async fn poll_ready_when_worker_is_dropped_early() {
let (service, _handle) = mock::pair::<(), ()>();
let mut exec = ExecFn(|_| Err(()));
let (service, worker) = Buffer::pair(service, 1);
let mut service = Buffer::with_executor(service, 1, &mut exec);
let mut service = mock::Spawn::new(service);
let err = assert_ready_err!(service.poll_ready(cx));
drop(worker);
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
})
let err = assert_ready_err!(service.poll_ready());
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
}
#[test]
fn poll_ready_when_worker_is_dropped_early() {
task::mock(|cx| {
let (service, _handle) = mock::pair::<(), ()>();
#[tokio::test]
async fn response_future_when_worker_is_dropped_early() {
let (service, mut handle) = mock::pair::<_, ()>();
// drop that worker right on the floor!
let mut exec = ExecFn(|fut| {
drop(fut);
Ok(())
});
let (service, worker) = Buffer::pair(service, 1);
let mut service = Buffer::with_executor(service, 1, &mut exec);
let mut service = mock::Spawn::new(service);
let err = assert_ready_err!(service.poll_ready(cx));
// keep the request in the worker
handle.allow(0);
let mut response = task::spawn(service.call("hello"));
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
});
}
drop(worker);
#[test]
fn response_future_when_worker_is_dropped_early() {
task::mock(|cx| {
let (service, mut handle) = mock::pair::<_, ()>();
// hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None);
let mut exec = ExecFn(|fut| {
*cell.borrow_mut() = Some(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec);
// keep the request in the worker
handle.allow(0);
let response = service.call("hello");
pin_mut!(response);
// drop the worker (like an executor closing up)
cell.borrow_mut().take();
let_worker_work();
let err = assert_ready_err!(response.poll(cx));
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
})
let_worker_work();
let err = assert_ready_err!(response.poll());
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
struct Exec;
impl<F> TypedExecutor<F> for Exec
where
F: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
thread::spawn(move || {
let mut mock = tokio_test::task::MockTask::new();
pin_mut!(fut);
while mock.poll(fut.as_mut()).is_pending() {}
});
Ok(())
}
}
struct ExecFn<Func>(Func);
impl<Func, F> TypedExecutor<F> for ExecFn<Func>
where
Func: Fn(F) -> Result<(), ()>,
F: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
(self.0)(fut).map_err(|()| SpawnError::shutdown())
}
}
fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = mock::pair();
fn new_service() -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle) {
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests
let service = Buffer::with_executor(service, 10, &mut Exec);
(service, handle)
mock::spawn_with(|s| {
let (svc, worker) = Buffer::pair(s, 10);
thread::spawn(move || {
let mut fut = tokio_test::task::spawn(worker);
while fut.poll().is_pending() {}
});
svc
})
}

View File

@ -39,6 +39,18 @@ pub fn spawn<T, U>() -> (Spawn<Mock<T, U>>, Handle<T, U>) {
(Spawn::new(svc), handle)
}
/// Spawn a Service via the provided wrapper closure.
pub fn spawn_with<T, U, F, S>(f: F) -> (Spawn<S>, Handle<T, U>)
where
F: Fn(Mock<T, U>) -> S,
{
let (svc, handle) = pair();
let svc = f(svc);
(Spawn::new(svc), handle)
}
/// A mock service
#[derive(Debug)]
pub struct Mock<T, U> {