Make `Buffer` use a bounded channel (#120)

This change moves `Buffer` from `mpsc::unbounded` to `mpsc::channel`. The primary motivation for this change is that bounded channels provide back-pressure to callers, so that `Balance<Buffer>` for example works as expected. Currently, `Buffer` will accept as many requests as you can make for it without ever stopping down, slowly eating up all your memory.
This commit is contained in:
Jon Gjengset 2018-12-05 11:45:53 -05:00 committed by GitHub
parent 801adb18db
commit 6377702087
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 26 deletions

View File

@ -213,7 +213,7 @@ impl Discover for Disco {
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.changes.pop_front() {
Some(Change::Insert(k, svc)) => {
let svc = Buffer::new(svc).unwrap();
let svc = Buffer::new(svc, 0).unwrap();
let svc = InFlightLimit::new(svc, ENDPOINT_CAPACITY);
Ok(Async::Ready(Change::Insert(k, svc)))
}
@ -249,7 +249,7 @@ where
pub fn new(lb: lb::Balance<D, C>, total: usize, concurrency: usize) -> Self {
Self {
send_remaining: total,
lb: InFlightLimit::new(Buffer::new(lb).ok().expect("buffer"), concurrency),
lb: InFlightLimit::new(Buffer::new(lb, 0).ok().expect("buffer"), concurrency),
responses: stream::FuturesUnordered::new(),
}
}

View File

@ -4,10 +4,6 @@
//! out of the buffer and dispatching them to the inner service. By adding a
//! buffer and a dedicated task, the `Buffer` layer in front of the service can
//! be `Clone` even if the inner service is not.
//!
//! Currently, `Buffer` uses an unbounded buffering strategy, which is not a
//! good thing to put in production situations. However, it illustrates the idea
//! and capabilities around adding buffering to an arbitrary `Service`.
#[macro_use]
extern crate futures;
@ -16,8 +12,8 @@ extern crate tokio_executor;
extern crate tower_direct_service;
use futures::future::Executor;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use futures::{Async, Future, Poll, Stream};
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
@ -35,7 +31,7 @@ pub struct Buffer<T, Request>
where
T: Service<Request>,
{
tx: UnboundedSender<Message<Request, T::Future>>,
tx: mpsc::Sender<Message<Request, T::Future>>,
state: Arc<State>,
}
@ -125,7 +121,7 @@ mod sealed {
T: DirectService<Request>,
{
pub(crate) current_message: Option<Message<Request, T::Future>>,
pub(crate) rx: UnboundedReceiver<Message<Request, T::Future>>,
pub(crate) rx: mpsc::Receiver<Message<Request, T::Future>>,
pub(crate) service: T,
pub(crate) finish: bool,
pub(crate) state: Arc<State>,
@ -165,6 +161,7 @@ struct State {
}
enum ResponseState<T> {
Failed,
Rx(oneshot::Receiver<T>),
Poll(T),
}
@ -175,15 +172,18 @@ where
{
/// Creates a new `Buffer` wrapping `service`.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// The default Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
pub fn new(service: T) -> Result<Self, SpawnError<T>>
pub fn new(service: T, bound: usize) -> Result<Self, SpawnError<T>>
where
T: Send + 'static,
T::Future: Send,
Request: Send + 'static,
{
Self::with_executor(service, &DefaultExecutor::current())
Self::with_executor(service, bound, &DefaultExecutor::current())
}
/// Creates a new `Buffer` wrapping `service`.
@ -191,11 +191,14 @@ where
/// `executor` is used to spawn a new `Worker` task that is dedicated to
/// draining the buffer and dispatching the requests to the internal
/// service.
pub fn with_executor<E>(service: T, executor: &E) -> Result<Self, SpawnError<T>>
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn with_executor<E>(service: T, bound: usize, executor: &E) -> Result<Self, SpawnError<T>>
where
E: WorkerExecutor<DirectedService<T>, Request>,
{
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = mpsc::channel(bound);
let state = Arc::new(State {
open: AtomicBool::new(true),
@ -229,11 +232,14 @@ where
/// `executor` is used to spawn a new `Worker` task that is dedicated to
/// draining the buffer and dispatching the requests to the internal
/// service.
pub fn new_direct<E>(service: T, executor: &E) -> Result<Self, SpawnError<T>>
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn new_direct<E>(service: T, bound: usize, executor: &E) -> Result<Self, SpawnError<T>>
where
E: Executor<Worker<T, Request>>,
{
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = mpsc::channel(bound);
let state = Arc::new(State {
open: AtomicBool::new(true),
@ -267,25 +273,28 @@ where
if !self.state.open.load(Ordering::Acquire) {
return Err(Error::Closed);
} else {
// Ideally we could query if the `mpsc` is closed, but this is not
// currently possible.
Ok(().into())
self.tx.poll_ready().map_err(|_| Error::Closed)
}
}
fn call(&mut self, request: Request) -> Self::Future {
// TODO:
// ideally we'd poll_ready again here so we don't allocate the oneshot
// if the try_send is about to fail, but sadly we can't call poll_ready
// outside of task context.
let (tx, rx) = oneshot::channel();
let sent = self.tx.unbounded_send(Message {
request,
tx,
});
let sent = self.tx.try_send(Message { request, tx });
if sent.is_err() {
self.state.open.store(false, Ordering::Release);
ResponseFuture {
state: ResponseState::Failed,
}
} else {
ResponseFuture {
state: ResponseState::Rx(rx),
}
}
ResponseFuture { state: ResponseState::Rx(rx) }
}
}
@ -317,6 +326,9 @@ where
let fut;
match self.state {
Failed => {
return Err(Error::Closed);
}
Rx(ref mut rx) => {
match rx.poll() {
Ok(Async::Ready(f)) => fut = f,

View File

@ -100,7 +100,8 @@ where
fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = Mock::new();
let service = Buffer::with_executor(service, &Exec).unwrap();
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests
let service = Buffer::with_executor(service, 10, &Exec).unwrap();
(service, handle)
}