diff --git a/Cargo.toml b/Cargo.toml index 420eceb..7e41f45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ # "tower", # "tower-balance", - # "tower-buffer", + "tower-buffer", # "tower-discover", # "tower-filter", # "tower-hedge", diff --git a/tower-buffer/CHANGELOG.md b/tower-buffer/CHANGELOG.md index 6476fde..70c11c2 100644 --- a/tower-buffer/CHANGELOG.md +++ b/tower-buffer/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.3.0 (December 4, 2019) + +- Update to `tokio 0.2` +- Update to `futures-core 0.3` +- Update to `tower-service 0.3` +- Update to `tower-layer 0.3` + # 0.3.0-alpha.2 (September 30, 2019) - Move to `futures-*-preview 0.3.0-alpha.19` diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml index 47b3b45..fd0113c 100644 --- a/tower-buffer/Cargo.toml +++ b/tower-buffer/Cargo.toml @@ -8,7 +8,7 @@ name = "tower-buffer" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.3.0-alpha.2" +version = "0.3.0" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" @@ -26,15 +26,14 @@ log = ["tracing/log"] default = ["log"] [dependencies] -futures-core-preview = "=0.3.0-alpha.19" +futures-core = "0.3" pin-project = "0.4" -tower-service = { version = "=0.3.0-alpha.2", path = "../tower-service" } -tower-layer = { version = "=0.3.0-alpha.2", path = "../tower-layer" } -tokio-executor = "=0.2.0-alpha.6" -tokio-sync = "=0.2.0-alpha.6" +tower-service = { version = "0.3", path = "../tower-service" } +tower-layer = { version = "0.3", path = "../tower-layer" } +tokio = { version = "0.2", features = ["sync"] } tracing = "0.1.2" [dev-dependencies] -tower-test = { version = "=0.3.0-alpha.2", path = "../tower-test" } -tokio-test = { version = "=0.2.0-alpha.6" } -futures-util-preview = "=0.3.0-alpha.19" +tower-test = { version = "0.3", path = "../tower-test" } +tokio-test = { version = "0.2" } +tokio = { version = "0.2", features = ["macros"] } diff --git a/tower-buffer/src/error.rs b/tower-buffer/src/error.rs index de4dac2..2ddbb35 100644 --- a/tower-buffer/src/error.rs +++ b/tower-buffer/src/error.rs @@ -14,12 +14,6 @@ pub struct Closed { _p: (), } -/// Error produced when spawning the worker fails -#[derive(Debug)] -pub struct SpawnError { - _p: (), -} - /// Errors produced by `Buffer`. pub(crate) type Error = Box; @@ -66,19 +60,3 @@ impl fmt::Display for Closed { } impl std::error::Error for Closed {} - -// ===== impl SpawnError ===== - -impl SpawnError { - pub(crate) fn new() -> SpawnError { - SpawnError { _p: () } - } -} - -impl fmt::Display for SpawnError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "failed to spawn Buffer worker task") - } -} - -impl std::error::Error for SpawnError {} diff --git a/tower-buffer/src/layer.rs b/tower-buffer/src/layer.rs index d6af0fb..7f19480 100644 --- a/tower-buffer/src/layer.rs +++ b/tower-buffer/src/layer.rs @@ -1,56 +1,39 @@ -use crate::{error::Error, service::Buffer, worker::WorkerExecutor}; +use crate::{error::Error, service::Buffer}; use std::{fmt, marker::PhantomData}; -use tokio_executor::DefaultExecutor; use tower_layer::Layer; use tower_service::Service; /// Buffer requests with a bounded buffer -pub struct BufferLayer { +pub struct BufferLayer { bound: usize, - executor: E, _p: PhantomData, } -impl BufferLayer { - #[allow(missing_docs)] +impl BufferLayer { + /// Create a new `BufferLayer` with the provided `bound`. pub fn new(bound: usize) -> Self { BufferLayer { bound, - executor: DefaultExecutor::current(), _p: PhantomData, } } } -impl BufferLayer { - /// Create a new buffered service layer spawned on the given executor. - pub fn with_executor(bound: usize, executor: E) -> Self { - BufferLayer { - bound, - executor, - _p: PhantomData, - } - } -} - -impl Layer for BufferLayer +impl Layer for BufferLayer where - S: Service, - S::Error: Into, - E: WorkerExecutor + Clone, + S: Service + Send + 'static, + S::Future: Send, + S::Error: Into + Send + Sync, + Request: Send + 'static, { type Service = Buffer; fn layer(&self, service: S) -> Self::Service { - Buffer::with_executor(service, self.bound, &mut self.executor.clone()) + Buffer::new(service, self.bound) } } -impl fmt::Debug for BufferLayer -where - // Require E: Debug in case we want to print the executor at a later date - E: fmt::Debug, -{ +impl fmt::Debug for BufferLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BufferLayer") .field("bound", &self.bound) diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index c650c3a..24499d7 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0-alpha.2")] +#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0")] #![warn( missing_debug_implementations, missing_docs, @@ -23,4 +23,3 @@ mod worker; pub use crate::layer::BufferLayer; pub use crate::service::Buffer; -pub use crate::worker::WorkerExecutor; diff --git a/tower-buffer/src/message.rs b/tower-buffer/src/message.rs index a84d58d..98e7744 100644 --- a/tower-buffer/src/message.rs +++ b/tower-buffer/src/message.rs @@ -1,5 +1,5 @@ use crate::error::ServiceError; -use tokio_sync::oneshot; +use tokio::sync::oneshot; /// Message sent over buffer #[derive(Debug)] diff --git a/tower-buffer/src/service.rs b/tower-buffer/src/service.rs index 17c94e0..2a05dc9 100644 --- a/tower-buffer/src/service.rs +++ b/tower-buffer/src/service.rs @@ -1,14 +1,13 @@ use crate::{ - error::{Error, SpawnError}, + error::Error, future::ResponseFuture, message::Message, - worker::{Handle, Worker, WorkerExecutor}, + worker::{Handle, Worker}, }; use futures_core::ready; use std::task::{Context, Poll}; -use tokio_executor::DefaultExecutor; -use tokio_sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tower_service::Service; /// Adds a buffer in front of an inner service. @@ -20,7 +19,7 @@ where T: Service, { tx: mpsc::Sender>, - worker: Option, + handle: Handle, } impl Buffer @@ -42,35 +41,30 @@ where T::Error: Send + Sync, Request: Send + 'static, { - Self::with_executor(service, bound, &mut DefaultExecutor::current()) + let (tx, rx) = mpsc::channel(bound); + let (handle, worker) = Worker::new(service, rx); + tokio::spawn(worker); + Buffer { tx, handle } } - /// Creates a new `Buffer` wrapping `service`. + /// Creates a new `Buffer` wrapping `service` but returns the background worker. /// - /// `executor` is used to spawn a new `Worker` task that is dedicated to - /// draining the buffer and dispatching the requests to the internal - /// service. - /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. - pub fn with_executor(service: T, bound: usize, executor: &mut E) -> Self + /// This is useful if you do not want to spawn directly onto the `tokio` runtime + /// but instead want to use your own executor. This will return the `Buffer` and + /// the background `Worker` that you can then spawn. + pub fn pair(service: T, bound: usize) -> (Buffer, Worker) where - E: WorkerExecutor, + T: Send + 'static, + T::Error: Send + Sync, + Request: Send + 'static, { let (tx, rx) = mpsc::channel(bound); - let worker = Worker::spawn(service, rx, executor); - Buffer { tx, worker } + let (handle, worker) = Worker::new(service, rx); + (Buffer { tx, handle }, worker) } fn get_worker_error(&self) -> Error { - self.worker - .as_ref() - .map(|w| w.get_error_on_closed()) - .unwrap_or_else(|| { - // If there's no worker handle, that's because spawning it - // at the beginning failed. - SpawnError::new().into() - }) + self.handle.get_error_on_closed() } } @@ -105,19 +99,18 @@ where let span = tracing::Span::current(); tracing::trace!(parent: &span, "sending request to buffer worker"); match self.tx.try_send(Message { request, span, tx }) { - Err(e) => { - if e.is_closed() { - ResponseFuture::failed(self.get_worker_error()) - } else { - // When `mpsc::Sender::poll_ready` returns `Ready`, a slot - // in the channel is reserved for the handle. Other `Sender` - // handles may not send a message using that slot. This - // guarantees capacity for `request`. - // - // Given this, the only way to hit this code path is if - // `poll_ready` has not been called & `Ready` returned. - panic!("buffer full; poll_ready must be called first"); - } + Err(mpsc::error::TrySendError::Closed(_)) => { + ResponseFuture::failed(self.get_worker_error()) + } + Err(mpsc::error::TrySendError::Full(_)) => { + // When `mpsc::Sender::poll_ready` returns `Ready`, a slot + // in the channel is reserved for the handle. Other `Sender` + // handles may not send a message using that slot. This + // guarantees capacity for `request`. + // + // Given this, the only way to hit this code path is if + // `poll_ready` has not been called & `Ready` returned. + panic!("buffer full; poll_ready must be called first"); } Ok(_) => ResponseFuture::new(rx), } @@ -131,7 +124,7 @@ where fn clone(&self) -> Self { Self { tx: self.tx.clone(), - worker: self.worker.clone(), + handle: self.handle.clone(), } } } diff --git a/tower-buffer/src/worker.rs b/tower-buffer/src/worker.rs index d7e6fec..94bee09 100644 --- a/tower-buffer/src/worker.rs +++ b/tower-buffer/src/worker.rs @@ -10,8 +10,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio_executor::TypedExecutor; -use tokio_sync::mpsc; +use tokio::sync::mpsc; use tower_service::Service; /// Task that handles processing the buffer. This type should not be used @@ -42,35 +41,15 @@ pub(crate) struct Handle { inner: Arc>>, } -/// This trait allows you to use either Tokio's threaded runtime's executor or the `current_thread` -/// runtime's executor depending on if `T` is `Send` or `!Send`. -pub trait WorkerExecutor: TypedExecutor> -where - T: Service, - T::Error: Into, -{ -} - -impl>> WorkerExecutor for E -where - T: Service, - T::Error: Into, -{ -} - impl Worker where T: Service, T::Error: Into, { - pub(crate) fn spawn( + pub(crate) fn new( service: T, rx: mpsc::Receiver>, - executor: &mut E, - ) -> Option - where - E: WorkerExecutor, - { + ) -> (Handle, Worker) { let handle = Handle { inner: Arc::new(Mutex::new(None)), }; @@ -84,10 +63,7 @@ where handle: handle.clone(), }; - match executor.spawn(worker) { - Ok(()) => Some(handle), - Err(_) => None, - } + (handle, worker) } /// Return the next queued Message that hasn't been canceled. diff --git a/tower-buffer/tests/buffer.rs b/tower-buffer/tests/buffer.rs index 3cf476d..f26c3a9 100644 --- a/tower-buffer/tests/buffer.rs +++ b/tower-buffer/tests/buffer.rs @@ -1,223 +1,150 @@ -use futures_util::pin_mut; -use std::future::Future; -use std::{cell::RefCell, thread}; -use tokio_executor::{SpawnError, TypedExecutor}; +use std::thread; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; use tower_buffer::{error, Buffer}; -use tower_service::Service; use tower_test::{assert_request_eq, mock}; fn let_worker_work() { // Allow the Buffer's executor to do work - ::std::thread::sleep(::std::time::Duration::from_millis(100)); + thread::sleep(::std::time::Duration::from_millis(100)); } -#[test] -fn req_and_res() { - task::mock(|cx| { - let (mut service, handle) = new_service(); +#[tokio::test] +async fn req_and_res() { + let (mut service, mut handle) = new_service(); - let response = service.call("hello"); - pin_mut!(response); - pin_mut!(handle); + let mut response = task::spawn(service.call("hello")); - assert_request_eq!(handle, "hello").send_response("world"); + assert_request_eq!(handle, "hello").send_response("world"); - let_worker_work(); - assert_eq!(assert_ready_ok!(response.as_mut().poll(cx)), "world"); - }); + let_worker_work(); + assert_eq!(assert_ready_ok!(response.poll()), "world"); } -#[test] -fn clears_canceled_requests() { - task::mock(|cx| { - let (mut service, handle) = new_service(); - pin_mut!(handle); +#[tokio::test] +async fn clears_canceled_requests() { + let (mut service, mut handle) = new_service(); - handle.allow(1); + handle.allow(1); - let res1 = service.call("hello"); - pin_mut!(res1); + let mut res1 = task::spawn(service.call("hello")); - let send_response1 = assert_request_eq!(handle, "hello"); + let send_response1 = assert_request_eq!(handle, "hello"); - // don't respond yet, new requests will get buffered + // don't respond yet, new requests will get buffered - let res2 = service.call("hello2"); + let res2 = task::spawn(service.call("hello2")); - assert_pending!(handle.as_mut().poll_request(cx)); + assert_pending!(handle.poll_request()); - let res3 = service.call("hello3"); - pin_mut!(res3); + let mut res3 = task::spawn(service.call("hello3")); - drop(res2); + drop(res2); - send_response1.send_response("world"); + send_response1.send_response("world"); - let_worker_work(); - assert_eq!(assert_ready_ok!(res1.poll(cx)), "world"); + let_worker_work(); + assert_eq!(assert_ready_ok!(res1.poll()), "world"); - // res2 was dropped, so it should have been canceled in the buffer - handle.allow(1); + // res2 was dropped, so it should have been canceled in the buffer + handle.allow(1); - assert_request_eq!(handle, "hello3").send_response("world3"); + assert_request_eq!(handle, "hello3").send_response("world3"); - let_worker_work(); - assert_eq!(assert_ready_ok!(res3.poll(cx)), "world3"); - }); + let_worker_work(); + assert_eq!(assert_ready_ok!(res3.poll()), "world3"); } -#[test] -fn when_inner_is_not_ready() { - task::mock(|cx| { - let (mut service, handle) = new_service(); - pin_mut!(handle); +#[tokio::test] +async fn when_inner_is_not_ready() { + let (mut service, mut handle) = new_service(); - // Make the service NotReady - handle.allow(0); + // Make the service NotReady + handle.allow(0); - let res1 = service.call("hello"); - pin_mut!(res1); + let mut res1 = task::spawn(service.call("hello")); - let_worker_work(); - assert_pending!(res1.as_mut().poll(cx)); - assert_pending!(handle.as_mut().poll_request(cx)); + let_worker_work(); + assert_pending!(res1.poll()); + assert_pending!(handle.poll_request()); - handle.allow(1); + handle.allow(1); - assert_request_eq!(handle, "hello").send_response("world"); + assert_request_eq!(handle, "hello").send_response("world"); - let_worker_work(); - assert_eq!(assert_ready_ok!(res1.poll(cx)), "world"); - }); + let_worker_work(); + assert_eq!(assert_ready_ok!(res1.poll()), "world"); } -#[test] -fn when_inner_fails() { - task::mock(|cx| { - use std::error::Error as StdError; +#[tokio::test] +async fn when_inner_fails() { + use std::error::Error as StdError; - let (mut service, mut handle) = new_service(); + let (mut service, mut handle) = new_service(); - // Make the service NotReady - handle.allow(0); - handle.send_error("foobar"); + // Make the service NotReady + handle.allow(0); + handle.send_error("foobar"); - let res1 = service.call("hello"); - pin_mut!(res1); + let mut res1 = task::spawn(service.call("hello")); - let_worker_work(); - let e = assert_ready_err!(res1.poll(cx)); - if let Some(e) = e.downcast_ref::() { - let e = e.source().unwrap(); + let_worker_work(); + let e = assert_ready_err!(res1.poll()); + if let Some(e) = e.downcast_ref::() { + let e = e.source().unwrap(); - assert_eq!(e.to_string(), "foobar"); - } else { - panic!("unexpected error type: {:?}", e); - } - }); + assert_eq!(e.to_string(), "foobar"); + } else { + panic!("unexpected error type: {:?}", e); + } } -#[test] -fn when_spawn_fails() { - task::mock(|cx| { - let (service, _handle) = mock::pair::<(), ()>(); +#[tokio::test] +async fn poll_ready_when_worker_is_dropped_early() { + let (service, _handle) = mock::pair::<(), ()>(); - let mut exec = ExecFn(|_| Err(())); + let (service, worker) = Buffer::pair(service, 1); - let mut service = Buffer::with_executor(service, 1, &mut exec); + let mut service = mock::Spawn::new(service); - let err = assert_ready_err!(service.poll_ready(cx)); + drop(worker); - assert!( - err.is::(), - "should be a SpawnError: {:?}", - err - ); - }) + let err = assert_ready_err!(service.poll_ready()); + + assert!(err.is::(), "should be a Closed: {:?}", err); } -#[test] -fn poll_ready_when_worker_is_dropped_early() { - task::mock(|cx| { - let (service, _handle) = mock::pair::<(), ()>(); +#[tokio::test] +async fn response_future_when_worker_is_dropped_early() { + let (service, mut handle) = mock::pair::<_, ()>(); - // drop that worker right on the floor! - let mut exec = ExecFn(|fut| { - drop(fut); - Ok(()) - }); + let (service, worker) = Buffer::pair(service, 1); - let mut service = Buffer::with_executor(service, 1, &mut exec); + let mut service = mock::Spawn::new(service); - let err = assert_ready_err!(service.poll_ready(cx)); + // keep the request in the worker + handle.allow(0); + let mut response = task::spawn(service.call("hello")); - assert!(err.is::(), "should be a Closed: {:?}", err); - }); -} + drop(worker); -#[test] -fn response_future_when_worker_is_dropped_early() { - task::mock(|cx| { - let (service, mut handle) = mock::pair::<_, ()>(); - - // hold the worker in a cell until we want to drop it later - let cell = RefCell::new(None); - let mut exec = ExecFn(|fut| { - *cell.borrow_mut() = Some(fut); - Ok(()) - }); - - let mut service = Buffer::with_executor(service, 1, &mut exec); - - // keep the request in the worker - handle.allow(0); - let response = service.call("hello"); - pin_mut!(response); - - // drop the worker (like an executor closing up) - cell.borrow_mut().take(); - - let_worker_work(); - let err = assert_ready_err!(response.poll(cx)); - assert!(err.is::(), "should be a Closed: {:?}", err); - }) + let_worker_work(); + let err = assert_ready_err!(response.poll()); + assert!(err.is::(), "should be a Closed: {:?}", err); } type Mock = mock::Mock<&'static str, &'static str>; type Handle = mock::Handle<&'static str, &'static str>; -struct Exec; - -impl TypedExecutor for Exec -where - F: Future + Send + 'static, -{ - fn spawn(&mut self, fut: F) -> Result<(), SpawnError> { - thread::spawn(move || { - let mut mock = tokio_test::task::MockTask::new(); - pin_mut!(fut); - while mock.poll(fut.as_mut()).is_pending() {} - }); - Ok(()) - } -} - -struct ExecFn(Func); - -impl TypedExecutor for ExecFn -where - Func: Fn(F) -> Result<(), ()>, - F: Future + Send + 'static, -{ - fn spawn(&mut self, fut: F) -> Result<(), SpawnError> { - (self.0)(fut).map_err(|()| SpawnError::shutdown()) - } -} - -fn new_service() -> (Buffer, Handle) { - let (service, handle) = mock::pair(); +fn new_service() -> (mock::Spawn>, Handle) { // bound is >0 here because clears_canceled_requests needs multiple outstanding requests - let service = Buffer::with_executor(service, 10, &mut Exec); - (service, handle) + mock::spawn_with(|s| { + let (svc, worker) = Buffer::pair(s, 10); + + thread::spawn(move || { + let mut fut = tokio_test::task::spawn(worker); + while fut.poll().is_pending() {} + }); + + svc + }) } diff --git a/tower-test/src/mock/mod.rs b/tower-test/src/mock/mod.rs index 5adf7fb..6720c99 100644 --- a/tower-test/src/mock/mod.rs +++ b/tower-test/src/mock/mod.rs @@ -39,6 +39,18 @@ pub fn spawn() -> (Spawn>, Handle) { (Spawn::new(svc), handle) } +/// Spawn a Service via the provided wrapper closure. +pub fn spawn_with(f: F) -> (Spawn, Handle) +where + F: Fn(Mock) -> S, +{ + let (svc, handle) = pair(); + + let svc = f(svc); + + (Spawn::new(svc), handle) +} + /// A mock service #[derive(Debug)] pub struct Mock {