tower-batch: initial implementation of batching logic.

The name "Buffer" is changed to "Batch" everywhere, and the worker task is rewritten.

Instead of having Worker implement Future directly, we have a consuming async run() function.
This commit is contained in:
Henry de Valence 2020-06-12 11:42:28 -07:00
parent dcd3f7bb2d
commit ee26e786f7
9 changed files with 185 additions and 203 deletions

2
Cargo.lock generated
View File

@ -1927,11 +1927,13 @@ dependencies = [
name = "tower-batch"
version = "0.1.0"
dependencies = [
"futures",
"futures-core",
"pin-project",
"tokio",
"tower",
"tracing",
"tracing-futures",
]
[[package]]

View File

@ -6,8 +6,10 @@ license = "MIT"
edition = "2018"
[dependencies]
tokio = { version = "0.2", features = ["time"] }
tokio = { version = "0.2", features = ["time", "sync", "stream"] }
tower = "0.3"
futures-core = "0.3.5"
pin-project = "0.4.20"
tracing = "0.1.15"
tracing-futures = "0.2.4"
futures = "0.3.5"

View File

@ -1,15 +1,15 @@
//! Error types for the `Buffer` middleware.
//! Error types for the `Batch` middleware.
use crate::BoxError;
use std::{fmt, sync::Arc};
/// An error produced by a `Service` wrapped by a `Buffer`
/// An error produced by a `Service` wrapped by a `Batch`.
#[derive(Debug)]
pub struct ServiceError {
inner: Arc<BoxError>,
}
/// An error produced when the a buffer's worker closes unexpectedly.
/// An error produced when the batch worker closes unexpectedly.
pub struct Closed {
_p: (),
}
@ -32,7 +32,7 @@ impl ServiceError {
impl fmt::Display for ServiceError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "buffered service failed: {}", self.inner)
write!(fmt, "batching service failed: {}", self.inner)
}
}
@ -58,7 +58,7 @@ impl fmt::Debug for Closed {
impl fmt::Display for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("buffer's worker closed unexpectedly")
fmt.write_str("batch worker closed unexpectedly")
}
}

View File

@ -1,4 +1,4 @@
//! Future types for the `Buffer` middleware.
//! Future types for the `Batch` middleware.
use super::{error::Closed, message};
use futures_core::ready;
@ -9,7 +9,7 @@ use std::{
task::{Context, Poll},
};
/// Future that completes when the buffered service eventually services the submitted request.
/// Future that completes when the batch processing is complete.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {

View File

@ -1,60 +1,56 @@
use super::service::Buffer;
use super::{service::Batch, BatchControl};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
use tower_service::Service;
use tower::layer::Layer;
use tower::Service;
/// Adds an mpsc buffer in front of an inner service.
/// Adds a layer performing batch processing of requests.
///
/// The default Tokio executor is used to run the given service,
/// which means that this layer can only be used on the Tokio runtime.
///
/// See the module documentation for more details.
pub struct BufferLayer<Request> {
bound: usize,
pub struct BatchLayer<Request> {
max_items: usize,
max_latency: std::time::Duration,
_p: PhantomData<fn(Request)>,
}
impl<Request> BufferLayer<Request> {
/// Creates a new `BufferLayer` with the provided `bound`.
impl<Request> BatchLayer<Request> {
/// Creates a new `BatchLayer`.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
/// The wrapper is responsible for telling the inner service when to flush a
/// batch of requests. Two parameters control this policy:
///
/// # A note on choosing a `bound`
///
/// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a
/// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new
/// requests.
pub fn new(bound: usize) -> Self {
BufferLayer {
bound,
/// * `max_items` gives the maximum number of items per batch.
/// * `max_latency` gives the maximum latency for a batch item.
pub fn new(max_items: usize, max_latency: std::time::Duration) -> Self {
BatchLayer {
max_items,
max_latency,
_p: PhantomData,
}
}
}
impl<S, Request> Layer<S> for BufferLayer<Request>
impl<S, Request> Layer<S> for BatchLayer<Request>
where
S: Service<Request> + Send + 'static,
S: Service<BatchControl<Request>> + Send + 'static,
S::Future: Send,
S::Error: Into<crate::BoxError> + Send + Sync,
Request: Send + 'static,
{
type Service = Buffer<S, Request>;
type Service = Batch<S, Request>;
fn layer(&self, service: S) -> Self::Service {
Buffer::new(service, self.bound)
Batch::new(service, self.max_items, self.max_latency)
}
}
impl<Request> fmt::Debug for BufferLayer<Request> {
impl<Request> fmt::Debug for BatchLayer<Request> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer")
.field("bound", &self.bound)
.field("max_items", &self.max_items)
.field("max_latency", &self.max_latency)
.finish()
}
}

View File

@ -7,5 +7,16 @@ mod worker;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub use self::layer::BufferLayer;
pub use self::service::Buffer;
pub enum BatchControl<R> {
Item(R),
Flush,
}
impl<R> From<R> for BatchControl<R> {
fn from(req: R) -> BatchControl<R> {
BatchControl::Item(req)
}
}
pub use self::layer::BatchLayer;
pub use self::service::Batch;

View File

@ -1,7 +1,7 @@
use super::error::ServiceError;
use tokio::sync::oneshot;
/// Message sent over buffer
/// Message sent to the batch worker
#[derive(Debug)]
pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,

View File

@ -2,6 +2,7 @@ use super::{
future::ResponseFuture,
message::Message,
worker::{Handle, Worker},
BatchControl,
};
use futures_core::ready;
@ -9,67 +10,45 @@ use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
use tower::Service;
/// Adds an mpsc buffer in front of an inner service.
/// Allows batch processing of requests.
///
/// See the module documentation for more details.
#[derive(Debug)]
pub struct Buffer<T, Request>
pub struct Batch<T, Request>
where
T: Service<Request>,
T: Service<BatchControl<Request>>,
{
tx: mpsc::Sender<Message<Request, T::Future>>,
handle: Handle,
}
impl<T, Request> Buffer<T, Request>
impl<T, Request> Batch<T, Request>
where
T: Service<Request>,
T: Service<BatchControl<Request>>,
T::Error: Into<crate::BoxError>,
{
/// Creates a new `Buffer` wrapping `service`.
/// Creates a new `Batch` wrapping `service`.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
/// The wrapper is responsible for telling the inner service when to flush a
/// batch of requests. Two parameters control this policy:
///
/// The default Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
/// * `max_items` gives the maximum number of items per batch.
/// * `max_latency` gives the maximum latency for a batch item.
///
/// # A note on choosing a `bound`
///
/// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a
/// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new
/// requests.
pub fn new(service: T, bound: usize) -> Self
/// The default Tokio executor is used to run the given service, which means
/// that this method must be called while on the Tokio runtime.
pub fn new(service: T, max_items: usize, max_latency: std::time::Duration) -> Self
where
T: Send + 'static,
T::Future: Send,
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::channel(bound);
let (handle, worker) = Worker::new(service, rx);
tokio::spawn(worker);
Buffer { tx, handle }
}
/// Creates a new `Buffer` wrapping `service`, but returns the background worker.
///
/// This is useful if you do not want to spawn directly onto the `tokio` runtime
/// but instead want to use your own executor. This will return the `Buffer` and
/// the background `Worker` that you can then spawn.
pub fn pair(service: T, bound: usize) -> (Buffer<T, Request>, Worker<T, Request>)
where
T: Send + 'static,
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::channel(bound);
let (handle, worker) = Worker::new(service, rx);
(Buffer { tx, handle }, worker)
// XXX(hdevalence): is this bound good
let (tx, rx) = mpsc::channel(1);
let (handle, worker) = Worker::new(service, rx, max_items, max_latency);
tokio::spawn(worker.run());
Batch { tx, handle }
}
fn get_worker_error(&self) -> crate::BoxError {
@ -77,9 +56,9 @@ where
}
}
impl<T, Request> Service<Request> for Buffer<T, Request>
impl<T, Request> Service<Request> for Batch<T, Request>
where
T: Service<Request>,
T: Service<BatchControl<Request>>,
T::Error: Into<crate::BoxError>,
{
type Response = T::Response;
@ -106,7 +85,7 @@ where
// if we didn't do this, events on the worker related to this span wouldn't be counted
// towards that span since the worker would have no way of entering it.
let span = tracing::Span::current();
tracing::trace!(parent: &span, "sending request to buffer worker");
tracing::trace!(parent: &span, "sending request to batch worker");
match self.tx.try_send(Message { request, span, tx }) {
Err(mpsc::error::TrySendError::Closed(_)) => {
ResponseFuture::failed(self.get_worker_error())
@ -126,9 +105,9 @@ where
}
}
impl<T, Request> Clone for Buffer<T, Request>
impl<T, Request> Clone for Batch<T, Request>
where
T: Service<Request>,
T: Service<BatchControl<Request>>,
{
fn clone(&self) -> Self {
Self {

View File

@ -1,17 +1,18 @@
use super::{
error::{Closed, ServiceError},
message::Message,
message::{self, Message},
BatchControl,
};
use futures_core::ready;
use futures::future::TryFutureExt;
use pin_project::pin_project;
use std::sync::{Arc, Mutex};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
use tokio::{
stream::StreamExt,
sync::mpsc,
time::{delay_for, Delay},
};
use tokio::sync::mpsc;
use tower::Service;
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
/// Task that handles processing the buffer. This type should not be used
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
@ -24,15 +25,15 @@ use tower::Service;
#[derive(Debug)]
pub struct Worker<T, Request>
where
T: Service<Request>,
T: Service<BatchControl<Request>>,
T::Error: Into<crate::BoxError>,
{
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::Receiver<Message<Request, T::Future>>,
service: T,
finish: bool,
failed: Option<ServiceError>,
handle: Handle,
max_items: usize,
max_latency: std::time::Duration,
}
/// Get the error out
@ -43,66 +44,125 @@ pub(crate) struct Handle {
impl<T, Request> Worker<T, Request>
where
T: Service<Request>,
T: Service<BatchControl<Request>>,
T::Error: Into<crate::BoxError>,
{
pub(crate) fn new(
service: T,
rx: mpsc::Receiver<Message<Request, T::Future>>,
max_items: usize,
max_latency: std::time::Duration,
) -> (Handle, Worker<T, Request>) {
let handle = Handle {
inner: Arc::new(Mutex::new(None)),
};
let worker = Worker {
current_message: None,
finish: false,
failed: None,
rx,
service,
handle: handle.clone(),
failed: None,
max_items,
max_latency,
};
(handle, worker)
}
/// Return the next queued Message that hasn't been canceled.
///
/// If a `Message` is returned, the `bool` is true if this is the first time we received this
/// message, and false otherwise (i.e., we tried to forward it to the backing service before).
fn poll_next_msg(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<(Message<Request, T::Future>, bool)>> {
if self.finish {
// We've already received None and are shutting down
return Poll::Ready(None);
}
tracing::trace!("worker polling for next message");
if let Some(mut msg) = self.current_message.take() {
// poll_closed returns Poll::Ready is the receiver is dropped.
// Returning Pending means it is still alive, so we should still
// use it.
if msg.tx.poll_closed(cx).is_pending() {
tracing::trace!("resuming buffered request");
return Poll::Ready(Some((msg, false)));
async fn process_req(&mut self, req: Request, tx: message::Tx<T::Future>) {
if let Some(ref failed) = self.failed {
tracing::trace!("notifying caller about worker failure");
let _ = tx.send(Err(failed.clone()));
} else {
match self.service.ready_and().await {
Ok(svc) => {
let rsp = svc.call(req.into());
let _ = tx.send(Ok(rsp));
}
Err(e) => {
self.failed(e.into());
let _ = tx.send(Err(self
.failed
.as_ref()
.expect("Worker::failed did not set self.failed?")
.clone()));
}
}
tracing::trace!("dropping cancelled buffered request");
}
}
// Get the next request
while let Some(mut msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
if msg.tx.poll_closed(cx).is_pending() {
tracing::trace!("processing new request");
return Poll::Ready(Some((msg, true)));
async fn flush_service(&mut self) {
if let Err(e) = self
.service
.ready_and()
.and_then(|svc| svc.call(BatchControl::Flush))
.await
{
self.failed(e.into());
}
}
pub async fn run(mut self) {
use futures::future::Either::{Left, Right};
// The timer is started when the first entry of a new batch is
// submitted, so that the batch latency of all entries is at most
// self.max_latency. However, we don't keep the timer running unless
// there is a pending request to prevent wakeups on idle services.
let mut timer: Option<Delay> = None;
let mut pending_items = 0usize;
loop {
match timer {
None => match self.rx.next().await {
// The first message in a new batch.
Some(msg) => {
let span = msg.span;
self.process_req(msg.request, msg.tx)
// Apply the provided span to request processing
.instrument(span)
.await;
timer = Some(delay_for(self.max_latency));
pending_items = 1;
}
// No more messages, ever.
None => return,
},
Some(delay) => {
// Wait on either a new message or the batch timer.
match futures::future::select(self.rx.next(), delay).await {
Left((Some(msg), delay)) => {
let span = msg.span;
self.process_req(msg.request, msg.tx)
// Apply the provided span to request processing.
.instrument(span)
.await;
pending_items += 1;
// Check whether we have too many pending items.
if pending_items >= self.max_items {
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
// Now we have an empty batch.
timer = None;
pending_items = 0;
} else {
// The timer is still running, set it back!
timer = Some(delay);
}
}
// No more messages, ever.
Left((None, _delay)) => {
return;
}
// The batch timer elapsed.
Right(((), _next)) => {
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
timer = None;
pending_items = 0;
}
}
}
}
// Otherwise, request is canceled, so pop the next one.
tracing::trace!("dropping cancelled request");
}
Poll::Ready(None)
}
fn failed(&mut self, error: crate::BoxError) {
@ -132,82 +192,14 @@ where
self.rx.close();
// By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None),
// which will trigger the `self.finish == true` phase. We just need to make sure that any
// requests that we receive before we've exhausted the receiver receive the error:
// By closing the mpsc::Receiver, we know that that the run() loop will
// drain all pending requests. We just need to make sure that any
// requests that we receive before we've exhausted the receiver receive
// the error:
self.failed = Some(error);
}
}
impl<T, Request> Future for Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<crate::BoxError>,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.finish {
return Poll::Ready(());
}
loop {
match ready!(self.poll_next_msg(cx)) {
Some((msg, first)) => {
let _guard = msg.span.enter();
if let Some(ref failed) = self.failed {
tracing::trace!("notifying caller about worker failure");
let _ = msg.tx.send(Err(failed.clone()));
continue;
}
// Wait for the service to be ready
tracing::trace!(
resumed = !first,
message = "worker received request; waiting for service readiness"
);
match self.service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
tracing::debug!(service.ready = true, message = "processing request");
let response = self.service.call(msg.request);
// Send the response future back to the sender.
//
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
tracing::trace!("returning response future");
let _ = msg.tx.send(Ok(response));
}
Poll::Pending => {
tracing::trace!(service.ready = false, message = "delay");
// Put out current message back in its slot.
drop(_guard);
self.current_message = Some(msg);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
let error = e.into();
tracing::debug!({ %error }, "service failed");
drop(_guard);
self.failed(error);
let _ = msg.tx.send(Err(self
.failed
.as_ref()
.expect("Worker::failed did not set self.failed?")
.clone()));
}
}
}
None => {
// No more more requests _ever_.
self.finish = true;
return Poll::Ready(());
}
}
}
}
}
impl Handle {
pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
self.inner