Remove DirectService from tower-{balance,buffer} (#159)

This commit is contained in:
Jon Gjengset 2019-02-15 17:52:00 -05:00 committed by Carl Lerche
parent d7e1b8f5dd
commit 8390a1d288
4 changed files with 61 additions and 286 deletions

View File

@ -10,7 +10,6 @@ log = "0.4.1"
rand = "0.6"
tokio-timer = "0.2.4"
tower-service = { version = "0.2", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tower-discover = { version = "0.1", path = "../tower-discover" }
indexmap = "1"

View File

@ -7,7 +7,6 @@ extern crate indexmap;
extern crate quickcheck;
extern crate rand;
extern crate tokio_timer;
extern crate tower_direct_service;
extern crate tower_discover;
extern crate tower_service;
@ -16,7 +15,6 @@ use indexmap::IndexMap;
use rand::{rngs::SmallRng, SeedableRng};
use std::marker::PhantomData;
use std::{error, fmt};
use tower_direct_service::DirectService;
use tower_discover::Discover;
use tower_service::Service;
@ -187,9 +185,11 @@ where
///
/// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted
/// into `ready`, potentially altering the order of `ready` and/or `not_ready`.
fn promote_to_ready<F, E>(&mut self, mut poll_ready: F) -> Result<(), Error<E, D::Error>>
fn promote_to_ready<Request>(
&mut self,
) -> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
D::Service: Service<Request>,
{
let n = self.not_ready.len();
if n == 0 {
@ -206,7 +206,7 @@ where
.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
poll_ready(svc).map_err(Error::Inner)?.is_ready()
svc.poll_ready().map_err(Error::Inner)?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
@ -230,17 +230,16 @@ where
///
/// If the service exists in `ready` and does not poll as ready, it is moved to
/// `not_ready`, potentially altering the order of `ready` and/or `not_ready`.
fn poll_ready_index<F, E>(
fn poll_ready_index<Request>(
&mut self,
idx: usize,
mut poll_ready: F,
) -> Option<Poll<(), Error<E, D::Error>>>
) -> Option<Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
D::Service: Service<Request>,
{
match self.ready.get_index_mut(idx) {
None => return None,
Some((_, svc)) => match poll_ready(svc) {
Some((_, svc)) => match svc.poll_ready() {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Ok(Async::NotReady) => {}
@ -258,9 +257,11 @@ where
/// Chooses the next service to which a request will be dispatched.
///
/// Ensures that .
fn choose_and_poll_ready<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
fn choose_and_poll_ready<Request>(
&mut self,
) -> Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
D::Service: Service<Request>,
{
loop {
let n = self.ready.len();
@ -276,7 +277,7 @@ where
// XXX Should we handle per-endpoint errors?
if self
.poll_ready_index(idx, &mut poll_ready)
.poll_ready_index(idx)
.expect("invalid ready index")?
.is_ready()
{
@ -285,45 +286,6 @@ where
}
}
}
fn poll_ready_inner<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
// Clear before `ready` is altered.
self.chosen_ready_index = None;
// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx, &mut poll_ready)
.expect("invalid dispatched ready key")?;
}
// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready(&mut poll_ready)?;
// Choose the next service to be used by `call`.
self.choose_and_poll_ready(&mut poll_ready)
}
fn call<Request, F, FF>(&mut self, call: F, request: Request) -> ResponseFuture<FF, D::Error>
where
F: FnOnce(&mut D::Service, Request) -> FF,
FF: Future,
{
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self
.ready
.get_index_mut(idx)
.expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);
let rsp = call(svc, request);
ResponseFuture(rsp, PhantomData)
}
}
impl<D, C, Request> Service<Request> for Balance<D, C>
@ -341,84 +303,35 @@ where
/// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.poll_ready_inner(D::Service::poll_ready)
// Clear before `ready` is altered.
self.chosen_ready_index = None;
// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx)
.expect("invalid dispatched ready key")?;
}
// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready()?;
// Choose the next service to be used by `call`.
self.choose_and_poll_ready()
}
fn call(&mut self, request: Request) -> Self::Future {
self.call(D::Service::call, request)
}
}
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self
.ready
.get_index_mut(idx)
.expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);
impl<D, C, Request> DirectService<Request> for Balance<D, C>
where
D: Discover,
D::Service: DirectService<Request>,
C: Choose<D::Key, D::Service>,
{
type Response = <D::Service as DirectService<Request>>::Response;
type Error = Error<<D::Service as DirectService<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as DirectService<Request>>::Future, D::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.poll_ready_inner(D::Service::poll_ready)
}
fn call(&mut self, request: Request) -> Self::Future {
self.call(D::Service::call, request)
}
fn poll_service(&mut self) -> Poll<(), Self::Error> {
let mut any_not_ready = false;
// TODO: don't re-poll services that return Ready until call is invoked on them
for (_, svc) in &mut self.ready {
if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? {
any_not_ready = true;
}
}
for (_, svc) in &mut self.not_ready {
if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? {
any_not_ready = true;
}
}
if any_not_ready {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}
fn poll_close(&mut self) -> Poll<(), Self::Error> {
let mut err = None;
self.ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});
self.not_ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});
if let Some(e) = err {
return Err(Error::Inner(e));
}
if self.ready.is_empty() && self.not_ready.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
let rsp = svc.call(request);
ResponseFuture(rsp, PhantomData)
}
}

View File

@ -7,7 +7,6 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tokio-executor = "0.1"
lazycell = "1.2"
tokio-sync = "0.1"

View File

@ -10,18 +10,15 @@ extern crate futures;
extern crate lazycell;
extern crate tokio_executor;
extern crate tokio_sync;
extern crate tower_direct_service;
extern crate tower_service;
use futures::future::Executor;
use futures::{Async, Future, Poll, Stream};
use std::marker::PhantomData;
use std::sync::Arc;
use std::{error, fmt};
use tokio_executor::DefaultExecutor;
use tokio_sync::mpsc;
use tokio_sync::oneshot;
use tower_direct_service::DirectService;
use tower_service::Service;
/// Adds a buffer in front of an inner service.
@ -35,9 +32,6 @@ where
state: Arc<State<T::Error>>,
}
/// A [`Buffer`] that is backed by a `DirectService`.
pub type DirectBuffer<T, Request> = Buffer<DirectServiceRef<T>, Request>;
/// Future eventually completed with the response to the original request.
pub struct ResponseFuture<T, E> {
state: ResponseState<T, E>,
@ -68,65 +62,6 @@ pub enum Error<E> {
Full,
}
/// An adapter that exposes the associated types of a `DirectService` through `Service`.
/// This type does *not* let you pretend that a `DirectService` is a `Service`; that would be
/// incorrect, as the caller would then not call `poll_service` and `poll_close` as necessary on
/// the underlying `DirectService`. Instead, it merely provides a type-level adapter which allows
/// types that are generic over `T: Service`, but only need access to associated types of `T`, to
/// also take a `DirectService` ([`Buffer`] is an example of such a type).
pub struct DirectServiceRef<T> {
_marker: PhantomData<T>,
}
impl<T, Request> Service<Request> for DirectServiceRef<T>
where
T: DirectService<Request>,
{
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
unreachable!("tried to poll a DirectService through a marker reference")
}
fn call(&mut self, _: Request) -> Self::Future {
unreachable!("tried to call a DirectService through a marker reference")
}
}
/// A wrapper that exposes a `Service` (which does not need to be driven) as a `DirectService` so
/// that a construct that is *able* to take a `DirectService` can also take instances of
/// `Service`.
pub struct DirectedService<T>(T);
impl<T, Request> DirectService<Request> for DirectedService<T>
where
T: Service<Request>,
{
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
fn poll_service(&mut self) -> Poll<(), Self::Error> {
// TODO: is this the right thing to do?
Ok(Async::Ready(()))
}
fn poll_close(&mut self) -> Poll<(), Self::Error> {
// TODO: is this the right thing to do?
Ok(Async::Ready(()))
}
fn call(&mut self, req: Request) -> Self::Future {
self.0.call(req)
}
}
mod sealed {
use super::*;
@ -134,7 +69,7 @@ mod sealed {
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
pub struct Worker<T, Request>
where
T: DirectService<Request>,
T: Service<Request>,
{
pub(crate) current_message: Option<Message<Request, T::Future, T::Error>>,
pub(crate) rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>,
@ -150,12 +85,12 @@ use sealed::Worker;
/// runtime's executor depending on if `T` is `Send` or `!Send`.
pub trait WorkerExecutor<T, Request>: Executor<sealed::Worker<T, Request>>
where
T: DirectService<Request>,
T: Service<Request>,
{
}
impl<T, Request, E: Executor<sealed::Worker<T, Request>>> WorkerExecutor<T, Request> for E where
T: DirectService<Request>
T: Service<Request>
{
}
@ -215,44 +150,7 @@ where
/// backpressure is applied to callers.
pub fn with_executor<E>(service: T, bound: usize, executor: &E) -> Result<Self, SpawnError<T>>
where
E: WorkerExecutor<DirectedService<T>, Request>,
{
let (tx, rx) = mpsc::channel(bound);
let state = Arc::new(State {
err: lazycell::AtomicLazyCell::new(),
});
match Worker::spawn(DirectedService(service), rx, state.clone(), executor) {
Ok(()) => Ok(Buffer { tx, state: state }),
Err(DirectedService(service)) => Err(SpawnError { inner: service }),
}
}
fn get_error_on_closed(&self) -> Arc<ServiceError<T::Error>> {
self.state
.err
.borrow()
.cloned()
.expect("Worker exited, but did not set error.")
}
}
impl<T, Request> Buffer<DirectServiceRef<T>, Request>
where
T: DirectService<Request>,
{
/// Creates a new `Buffer` wrapping the given directly driven `service`.
///
/// `executor` is used to spawn a new `Worker` task that is dedicated to
/// draining the buffer and dispatching the requests to the internal
/// service.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn new_direct<E>(service: T, bound: usize, executor: &E) -> Result<Self, SpawnError<T>>
where
E: Executor<Worker<T, Request>>,
E: WorkerExecutor<T, Request>,
{
let (tx, rx) = mpsc::channel(bound);
@ -265,6 +163,14 @@ where
Err(service) => Err(SpawnError { inner: service }),
}
}
fn get_error_on_closed(&self) -> Arc<ServiceError<T::Error>> {
self.state
.err
.borrow()
.cloned()
.expect("Worker exited, but did not set error.")
}
}
impl<T, Request> Service<Request> for Buffer<T, Request>
@ -364,7 +270,7 @@ where
impl<T, Request> Worker<T, Request>
where
T: DirectService<Request>,
T: Service<Request>,
{
fn spawn<E>(
service: T,
@ -393,7 +299,7 @@ where
impl<T, Request> Worker<T, Request>
where
T: DirectService<Request>,
T: Service<Request>,
{
/// Return the next queued Message that hasn't been canceled.
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future, T::Error>>, ()> {
@ -454,13 +360,16 @@ where
impl<T, Request> Future for Worker<T, Request>
where
T: DirectService<Request>,
T: Service<Request>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
let mut any_outstanding = true;
if self.finish {
return Ok(().into());
}
loop {
match self.poll_next_msg()? {
Async::Ready(Some(msg)) => {
@ -479,19 +388,12 @@ where
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
let _ = msg.tx.send(Ok(response));
// Try to queue another request before we poll outstanding requests.
any_outstanding = true;
continue;
}
Ok(Async::NotReady) => {
// Put out current message back in its slot.
self.current_message = Some(msg);
if !any_outstanding {
return Ok(Async::NotReady);
}
// We may want to also make progress on current requests
return Ok(Async::NotReady);
}
Err(e) => {
self.failed("poll_ready", e);
@ -505,57 +407,19 @@ where
Async::Ready(None) => {
// No more more requests _ever_.
self.finish = true;
return Ok(Async::Ready(()));
}
Async::NotReady if self.failed.is_some() => {
// No need to poll the service as it has already failed.
return Ok(Async::NotReady);
}
Async::NotReady if any_outstanding => {
// Make some progress on the service if we can.
}
Async::NotReady => {
// There are no outstanding requests to make progress on.
// And we don't have any new requests to enqueue.
// We don't have any new requests to enqueue.
// So we yield.
return Ok(Async::NotReady);
}
}
if self.finish {
try_ready!(self.service.poll_close().map_err(move |e| {
self.failed("poll_close", e);
}));
// We are all done!
break;
} else {
debug_assert!(any_outstanding);
if let Async::Ready(()) = self.service.poll_service().map_err(|e| {
self.failed("poll_service", e);
})? {
// Note to future iterations that there's no reason to call poll_service.
any_outstanding = false;
} else {
// The service can't make any more progress.
// Let's see how we can have gotten here:
//
// - If poll_next_msg returned NotReady, we should return NotReady.
// - If poll_next_msg returned Ready(None), we'd have self.finish = true,
// but we're in the else clause, so that can't be the case.
// - If poll_next_msg returned Ready(Some) and poll_ready() returned NotReady,
// we should return NotReady here as well, since the service can't make
// progress yet to accept the message.
// - If poll_next_msg returned Ready(Some) and poll_ready() returned Ready,
// we'd have continued, so that can't be the case.
//
// Thus, in all cases when we get to this point, we should return NotReady.
// So:
return Ok(Async::NotReady);
}
}
}
// All senders are dropped... the task is no longer needed
Ok(().into())
}
}