From 8390a1d2888e012972fa5cbdee898ce69d0ad6bc Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Fri, 15 Feb 2019 17:52:00 -0500 Subject: [PATCH] Remove DirectService from tower-{balance,buffer} (#159) --- tower-balance/Cargo.toml | 1 - tower-balance/src/lib.rs | 165 +++++++++-------------------------- tower-buffer/Cargo.toml | 1 - tower-buffer/src/lib.rs | 180 +++++---------------------------------- 4 files changed, 61 insertions(+), 286 deletions(-) diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index 46868e7..3cd0bd4 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -10,7 +10,6 @@ log = "0.4.1" rand = "0.6" tokio-timer = "0.2.4" tower-service = { version = "0.2", path = "../tower-service" } -tower-direct-service = { version = "0.1", path = "../tower-direct-service" } tower-discover = { version = "0.1", path = "../tower-discover" } indexmap = "1" diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index 0409f66..e7482bc 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -7,7 +7,6 @@ extern crate indexmap; extern crate quickcheck; extern crate rand; extern crate tokio_timer; -extern crate tower_direct_service; extern crate tower_discover; extern crate tower_service; @@ -16,7 +15,6 @@ use indexmap::IndexMap; use rand::{rngs::SmallRng, SeedableRng}; use std::marker::PhantomData; use std::{error, fmt}; -use tower_direct_service::DirectService; use tower_discover::Discover; use tower_service::Service; @@ -187,9 +185,11 @@ where /// /// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted /// into `ready`, potentially altering the order of `ready` and/or `not_ready`. - fn promote_to_ready(&mut self, mut poll_ready: F) -> Result<(), Error> + fn promote_to_ready( + &mut self, + ) -> Result<(), Error<>::Error, D::Error>> where - F: FnMut(&mut D::Service) -> Poll<(), E>, + D::Service: Service, { let n = self.not_ready.len(); if n == 0 { @@ -206,7 +206,7 @@ where .not_ready .get_index_mut(idx) .expect("invalid not_ready index");; - poll_ready(svc).map_err(Error::Inner)?.is_ready() + svc.poll_ready().map_err(Error::Inner)?.is_ready() }; trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready); if is_ready { @@ -230,17 +230,16 @@ where /// /// If the service exists in `ready` and does not poll as ready, it is moved to /// `not_ready`, potentially altering the order of `ready` and/or `not_ready`. - fn poll_ready_index( + fn poll_ready_index( &mut self, idx: usize, - mut poll_ready: F, - ) -> Option>> + ) -> Option>::Error, D::Error>>> where - F: FnMut(&mut D::Service) -> Poll<(), E>, + D::Service: Service, { match self.ready.get_index_mut(idx) { None => return None, - Some((_, svc)) => match poll_ready(svc) { + Some((_, svc)) => match svc.poll_ready() { Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))), Err(e) => return Some(Err(Error::Inner(e))), Ok(Async::NotReady) => {} @@ -258,9 +257,11 @@ where /// Chooses the next service to which a request will be dispatched. /// /// Ensures that . - fn choose_and_poll_ready(&mut self, mut poll_ready: F) -> Poll<(), Error> + fn choose_and_poll_ready( + &mut self, + ) -> Poll<(), Error<>::Error, D::Error>> where - F: FnMut(&mut D::Service) -> Poll<(), E>, + D::Service: Service, { loop { let n = self.ready.len(); @@ -276,7 +277,7 @@ where // XXX Should we handle per-endpoint errors? if self - .poll_ready_index(idx, &mut poll_ready) + .poll_ready_index(idx) .expect("invalid ready index")? .is_ready() { @@ -285,45 +286,6 @@ where } } } - - fn poll_ready_inner(&mut self, mut poll_ready: F) -> Poll<(), Error> - where - F: FnMut(&mut D::Service) -> Poll<(), E>, - { - // Clear before `ready` is altered. - self.chosen_ready_index = None; - - // Before `ready` is altered, check the readiness of the last-used service, moving it - // to `not_ready` if appropriate. - if let Some(idx) = self.dispatched_ready_index.take() { - // XXX Should we handle per-endpoint errors? - self.poll_ready_index(idx, &mut poll_ready) - .expect("invalid dispatched ready key")?; - } - - // Update `not_ready` and `ready`. - self.update_from_discover()?; - self.promote_to_ready(&mut poll_ready)?; - - // Choose the next service to be used by `call`. - self.choose_and_poll_ready(&mut poll_ready) - } - - fn call(&mut self, call: F, request: Request) -> ResponseFuture - where - F: FnOnce(&mut D::Service, Request) -> FF, - FF: Future, - { - let idx = self.chosen_ready_index.take().expect("not ready"); - let (_, svc) = self - .ready - .get_index_mut(idx) - .expect("invalid chosen ready index"); - self.dispatched_ready_index = Some(idx); - - let rsp = call(svc, request); - ResponseFuture(rsp, PhantomData) - } } impl Service for Balance @@ -341,84 +303,35 @@ where /// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index /// into `ready` referring to a `Service` that is ready to disptach a request. fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.poll_ready_inner(D::Service::poll_ready) + // Clear before `ready` is altered. + self.chosen_ready_index = None; + + // Before `ready` is altered, check the readiness of the last-used service, moving it + // to `not_ready` if appropriate. + if let Some(idx) = self.dispatched_ready_index.take() { + // XXX Should we handle per-endpoint errors? + self.poll_ready_index(idx) + .expect("invalid dispatched ready key")?; + } + + // Update `not_ready` and `ready`. + self.update_from_discover()?; + self.promote_to_ready()?; + + // Choose the next service to be used by `call`. + self.choose_and_poll_ready() } fn call(&mut self, request: Request) -> Self::Future { - self.call(D::Service::call, request) - } -} + let idx = self.chosen_ready_index.take().expect("not ready"); + let (_, svc) = self + .ready + .get_index_mut(idx) + .expect("invalid chosen ready index"); + self.dispatched_ready_index = Some(idx); -impl DirectService for Balance -where - D: Discover, - D::Service: DirectService, - C: Choose, -{ - type Response = >::Response; - type Error = Error<>::Error, D::Error>; - type Future = ResponseFuture<>::Future, D::Error>; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.poll_ready_inner(D::Service::poll_ready) - } - - fn call(&mut self, request: Request) -> Self::Future { - self.call(D::Service::call, request) - } - - fn poll_service(&mut self) -> Poll<(), Self::Error> { - let mut any_not_ready = false; - - // TODO: don't re-poll services that return Ready until call is invoked on them - - for (_, svc) in &mut self.ready { - if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? { - any_not_ready = true; - } - } - - for (_, svc) in &mut self.not_ready { - if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? { - any_not_ready = true; - } - } - - if any_not_ready { - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) - } - } - - fn poll_close(&mut self) -> Poll<(), Self::Error> { - let mut err = None; - self.ready.retain(|_, svc| match svc.poll_close() { - Ok(Async::Ready(())) => return false, - Ok(Async::NotReady) => return true, - Err(e) => { - err = Some(e); - return false; - } - }); - self.not_ready.retain(|_, svc| match svc.poll_close() { - Ok(Async::Ready(())) => return false, - Ok(Async::NotReady) => return true, - Err(e) => { - err = Some(e); - return false; - } - }); - - if let Some(e) = err { - return Err(Error::Inner(e)); - } - - if self.ready.is_empty() && self.not_ready.is_empty() { - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) - } + let rsp = svc.call(request); + ResponseFuture(rsp, PhantomData) } } diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml index 1df8482..e960cd6 100644 --- a/tower-buffer/Cargo.toml +++ b/tower-buffer/Cargo.toml @@ -7,7 +7,6 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } -tower-direct-service = { version = "0.1", path = "../tower-direct-service" } tokio-executor = "0.1" lazycell = "1.2" tokio-sync = "0.1" diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index d2308c7..2e9bd52 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -10,18 +10,15 @@ extern crate futures; extern crate lazycell; extern crate tokio_executor; extern crate tokio_sync; -extern crate tower_direct_service; extern crate tower_service; use futures::future::Executor; use futures::{Async, Future, Poll, Stream}; -use std::marker::PhantomData; use std::sync::Arc; use std::{error, fmt}; use tokio_executor::DefaultExecutor; use tokio_sync::mpsc; use tokio_sync::oneshot; -use tower_direct_service::DirectService; use tower_service::Service; /// Adds a buffer in front of an inner service. @@ -35,9 +32,6 @@ where state: Arc>, } -/// A [`Buffer`] that is backed by a `DirectService`. -pub type DirectBuffer = Buffer, Request>; - /// Future eventually completed with the response to the original request. pub struct ResponseFuture { state: ResponseState, @@ -68,65 +62,6 @@ pub enum Error { Full, } -/// An adapter that exposes the associated types of a `DirectService` through `Service`. -/// This type does *not* let you pretend that a `DirectService` is a `Service`; that would be -/// incorrect, as the caller would then not call `poll_service` and `poll_close` as necessary on -/// the underlying `DirectService`. Instead, it merely provides a type-level adapter which allows -/// types that are generic over `T: Service`, but only need access to associated types of `T`, to -/// also take a `DirectService` ([`Buffer`] is an example of such a type). -pub struct DirectServiceRef { - _marker: PhantomData, -} - -impl Service for DirectServiceRef -where - T: DirectService, -{ - type Response = T::Response; - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - unreachable!("tried to poll a DirectService through a marker reference") - } - - fn call(&mut self, _: Request) -> Self::Future { - unreachable!("tried to call a DirectService through a marker reference") - } -} - -/// A wrapper that exposes a `Service` (which does not need to be driven) as a `DirectService` so -/// that a construct that is *able* to take a `DirectService` can also take instances of -/// `Service`. -pub struct DirectedService(T); - -impl DirectService for DirectedService -where - T: Service, -{ - type Response = T::Response; - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready() - } - - fn poll_service(&mut self) -> Poll<(), Self::Error> { - // TODO: is this the right thing to do? - Ok(Async::Ready(())) - } - - fn poll_close(&mut self) -> Poll<(), Self::Error> { - // TODO: is this the right thing to do? - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.0.call(req) - } -} - mod sealed { use super::*; @@ -134,7 +69,7 @@ mod sealed { /// directly, instead `Buffer` requires an `Executor` that can accept this task. pub struct Worker where - T: DirectService, + T: Service, { pub(crate) current_message: Option>, pub(crate) rx: mpsc::Receiver>, @@ -150,12 +85,12 @@ use sealed::Worker; /// runtime's executor depending on if `T` is `Send` or `!Send`. pub trait WorkerExecutor: Executor> where - T: DirectService, + T: Service, { } impl>> WorkerExecutor for E where - T: DirectService + T: Service { } @@ -215,44 +150,7 @@ where /// backpressure is applied to callers. pub fn with_executor(service: T, bound: usize, executor: &E) -> Result> where - E: WorkerExecutor, Request>, - { - let (tx, rx) = mpsc::channel(bound); - - let state = Arc::new(State { - err: lazycell::AtomicLazyCell::new(), - }); - - match Worker::spawn(DirectedService(service), rx, state.clone(), executor) { - Ok(()) => Ok(Buffer { tx, state: state }), - Err(DirectedService(service)) => Err(SpawnError { inner: service }), - } - } - - fn get_error_on_closed(&self) -> Arc> { - self.state - .err - .borrow() - .cloned() - .expect("Worker exited, but did not set error.") - } -} - -impl Buffer, Request> -where - T: DirectService, -{ - /// Creates a new `Buffer` wrapping the given directly driven `service`. - /// - /// `executor` is used to spawn a new `Worker` task that is dedicated to - /// draining the buffer and dispatching the requests to the internal - /// service. - /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. - pub fn new_direct(service: T, bound: usize, executor: &E) -> Result> - where - E: Executor>, + E: WorkerExecutor, { let (tx, rx) = mpsc::channel(bound); @@ -265,6 +163,14 @@ where Err(service) => Err(SpawnError { inner: service }), } } + + fn get_error_on_closed(&self) -> Arc> { + self.state + .err + .borrow() + .cloned() + .expect("Worker exited, but did not set error.") + } } impl Service for Buffer @@ -364,7 +270,7 @@ where impl Worker where - T: DirectService, + T: Service, { fn spawn( service: T, @@ -393,7 +299,7 @@ where impl Worker where - T: DirectService, + T: Service, { /// Return the next queued Message that hasn't been canceled. fn poll_next_msg(&mut self) -> Poll>, ()> { @@ -454,13 +360,16 @@ where impl Future for Worker where - T: DirectService, + T: Service, { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { - let mut any_outstanding = true; + if self.finish { + return Ok(().into()); + } + loop { match self.poll_next_msg()? { Async::Ready(Some(msg)) => { @@ -479,19 +388,12 @@ where // An error means the request had been canceled in-between // our calls, the response future will just be dropped. let _ = msg.tx.send(Ok(response)); - - // Try to queue another request before we poll outstanding requests. - any_outstanding = true; continue; } Ok(Async::NotReady) => { // Put out current message back in its slot. self.current_message = Some(msg); - - if !any_outstanding { - return Ok(Async::NotReady); - } - // We may want to also make progress on current requests + return Ok(Async::NotReady); } Err(e) => { self.failed("poll_ready", e); @@ -505,57 +407,19 @@ where Async::Ready(None) => { // No more more requests _ever_. self.finish = true; + return Ok(Async::Ready(())); } Async::NotReady if self.failed.is_some() => { // No need to poll the service as it has already failed. return Ok(Async::NotReady); } - Async::NotReady if any_outstanding => { - // Make some progress on the service if we can. - } Async::NotReady => { - // There are no outstanding requests to make progress on. - // And we don't have any new requests to enqueue. + // We don't have any new requests to enqueue. // So we yield. return Ok(Async::NotReady); } } - - if self.finish { - try_ready!(self.service.poll_close().map_err(move |e| { - self.failed("poll_close", e); - })); - // We are all done! - break; - } else { - debug_assert!(any_outstanding); - if let Async::Ready(()) = self.service.poll_service().map_err(|e| { - self.failed("poll_service", e); - })? { - // Note to future iterations that there's no reason to call poll_service. - any_outstanding = false; - } else { - // The service can't make any more progress. - // Let's see how we can have gotten here: - // - // - If poll_next_msg returned NotReady, we should return NotReady. - // - If poll_next_msg returned Ready(None), we'd have self.finish = true, - // but we're in the else clause, so that can't be the case. - // - If poll_next_msg returned Ready(Some) and poll_ready() returned NotReady, - // we should return NotReady here as well, since the service can't make - // progress yet to accept the message. - // - If poll_next_msg returned Ready(Some) and poll_ready() returned Ready, - // we'd have continued, so that can't be the case. - // - // Thus, in all cases when we get to this point, we should return NotReady. - // So: - return Ok(Async::NotReady); - } - } } - - // All senders are dropped... the task is no longer needed - Ok(().into()) } }