Add backpresure capabilities to `Service` (#6)

Currently, `Service` does not provide a mechanism by which it can signal
to the caller that it is at capacity. This commit adds a `poll_ready`
function to the `Service` trait. Callers are able to first check
`poll_ready` before calling `Service::call`.

`poll_ready` is expected to be a hint and will be implemented in a best
effort fashion. It is permitted for a `Service` to return `Ready` from
`poll_ready` and the next invocation of `Service::call` fails.
This commit is contained in:
Carl Lerche 2017-09-27 10:40:02 -07:00 committed by GitHub
parent 3a0212d460
commit bacb7dbfd2
3 changed files with 250 additions and 27 deletions

View File

@ -16,3 +16,9 @@ readme = "README.md"
[dependencies]
futures = "0.1"
[dev-dependencies]
log = "0.3"
env_logger = "0.4"
tokio-timer = "0.1"
futures-cpupool = "0.1"

172
examples/channel_service.rs Normal file
View File

@ -0,0 +1,172 @@
//! Spawns a task to respond to `Service` requests.
//!
//! The example demonstrates how to implement a service to handle backpressure
//! as well as how to use a service that can reach capacity.
//!
//! A task is used to handle service requests. The requests are dispatched to
//! the task using a channel. The task is implemented such requests can pile up
//! (responses are sent back after a fixed timeout).
#![deny(warnings)]
extern crate futures;
extern crate tokio_timer;
extern crate futures_cpupool;
extern crate tower;
#[macro_use]
extern crate log;
extern crate env_logger;
use tower::{Service, NewService};
use futures::{Future, Stream, IntoFuture, Poll, Async};
use futures::future::{Executor, FutureResult};
use futures::sync::{mpsc, oneshot};
use futures_cpupool::CpuPool;
use tokio_timer::Timer;
use std::io;
use std::time::Duration;
/// Service that dispatches requests to a side task using a channel.
#[derive(Debug)]
pub struct ChannelService {
// Send the request and a oneshot Sender to push the response into.
tx: Sender,
}
type Sender = mpsc::Sender<(String, oneshot::Sender<String>)>;
/// Creates new `ChannelService` services.
#[derive(Debug)]
pub struct NewChannelService {
// The number of requests to buffer
buffer: usize,
// The timer
timer: Timer,
// Executor to spawn the task on
pool: CpuPool,
}
/// Response backed by a oneshot.
#[derive(Debug)]
pub struct ResponseFuture {
rx: Option<oneshot::Receiver<String>>,
}
/// The service error
#[derive(Debug)]
pub enum Error {
AtCapacity,
Failed,
}
impl NewChannelService {
pub fn new(buffer: usize, pool: CpuPool) -> Self {
let timer = Timer::default();
NewChannelService {
buffer,
timer,
pool,
}
}
}
impl NewService for NewChannelService {
type Request = String;
type Response = String;
type Error = Error;
type InitError = io::Error;
type Instance = ChannelService;
type Future = FutureResult<Self::Instance, io::Error>;
fn new_service(&self) -> Self::Future {
let (tx, rx) = mpsc::channel::<(String, oneshot::Sender<String>)>(self.buffer);
let timer = self.timer.clone();
// Create the task that proceses the request
self.pool
.execute(rx.for_each(move |(msg, tx)| {
timer.sleep(Duration::from_millis(500))
.then(move |res| {
res.unwrap();
let _ = tx.send(msg);
Ok(())
})
}))
.map(|_| ChannelService { tx })
.map_err(|_| io::ErrorKind::Other.into())
.into_future()
}
}
impl Service for ChannelService {
type Request = String;
type Response = String;
type Error = Error;
type Future = ResponseFuture;
fn poll_ready(&mut self) -> Poll<(), Error> {
self.tx.poll_ready()
.map_err(|_| Error::Failed)
}
fn call(&mut self, request: String) -> ResponseFuture {
let (tx, rx) = oneshot::channel();
match self.tx.try_send((request, tx)) {
Ok(_) => {}
Err(_) => {
return ResponseFuture { rx: None };
}
}
ResponseFuture { rx: Some(rx) }
}
}
impl Future for ResponseFuture {
type Item = String;
type Error = Error;
fn poll(&mut self) -> Poll<String, Error> {
match self.rx {
Some(ref mut rx) => {
match rx.poll() {
Ok(Async::Ready(v)) => Ok(v.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(Error::Failed),
}
}
None => Err(Error::AtCapacity),
}
}
}
pub fn main() {
env_logger::init().unwrap();
let new_service = NewChannelService::new(5, CpuPool::new(1));
// Get the service
let mut service = new_service.new_service().wait().unwrap();
let mut responses = vec![];
for i in 0..10 {
service = service.ready().wait().unwrap();
info!("sending request; i={}", i);
let request = format!("request={}", i);
responses.push(service.call(request));
}
for response in responses {
println!("response={:?}", response.wait());
}
}

View File

@ -8,9 +8,10 @@
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/tower/0.1")]
#[macro_use]
extern crate futures;
use futures::{Future, IntoFuture};
use futures::{Future, IntoFuture, Poll};
use std::rc::Rc;
use std::sync::Arc;
@ -48,7 +49,11 @@ use std::sync::Arc;
/// type Error = http::Error;
/// type Future = Box<Future<Item = Self::Response, Error = http::Error>>;
///
/// fn call(&self, req: http::Request) -> Self::Future {
/// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
/// Ok(Async::Ready(()))
/// }
///
/// fn call(&mut self, req: http::Request) -> Self::Future {
/// // Create the HTTP response
/// let resp = http::Response::ok()
/// .with_body(b"hello world\n");
@ -122,7 +127,11 @@ use std::sync::Arc;
/// type Error = T::Error;
/// type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
///
/// fn call(&self, req: Self::Req) -> Self::Future {
/// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
/// Ok(Async::Ready(()))
/// }
///
/// fn call(&mut self, req: Self::Req) -> Self::Future {
/// let timeout = self.timer.sleep(self.delay)
/// .and_then(|_| Err(Self::Error::from(Expired)));
///
@ -139,6 +148,16 @@ use std::sync::Arc;
/// The above timeout implementation is decoupled from the underlying protocol
/// and is also decoupled from client or server concerns. In other words, the
/// same timeout middleware could be used in either a client or a server.
///
/// # Backpressure
///
/// Calling an at capacity `Service` (i.e., it temporarily unable to process a
/// request) should result in an error. The caller is responsible for ensuring
/// that the service is ready to receive the request before calling it.
///
/// `Service` provides a mechanism by which the caller is able to coordinate
/// readiness. `Service::poll_ready` returns `Ready` if the service expects that
/// it is able to process a request.
pub trait Service {
/// Requests handled by the service.
@ -153,8 +172,34 @@ pub trait Service {
/// The future response value.
type Future: Future<Item = Self::Response, Error = Self::Error>;
/// A future yielding the service when it is ready to accept a request.
fn ready(self) -> Ready<Self> where Self: Sized {
Ready { inner: Some(self) }
}
/// Returns `Ready` when the service is able to process requests.
///
/// If the service is at capacity, then `NotReady` is returned and the task
/// is notified when the service becomes ready again. This function is
/// expected to be called while on a task.
///
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
/// Process the request and return the response asynchronously.
fn call(&self, req: Self::Request) -> Self::Future;
///
/// This function is expected to be callable off task. As such,
/// implementations should take care to not call `poll_ready`. If the
/// service is at capacity and the request is unable to be handled, the
/// returned `Future` should resolve to an error.
fn call(&mut self, req: Self::Request) -> Self::Future;
}
/// Future yielding a `Service` once the service is ready to process a request
pub struct Ready<T> {
inner: Option<T>,
}
/// Creates new `Service` values.
@ -181,6 +226,24 @@ pub trait NewService {
fn new_service(&self) -> Self::Future;
}
impl<T> Future for Ready<T>
where T: Service,
{
type Item = T;
type Error = T::Error;
fn poll(&mut self) -> Poll<T, T::Error> {
match self.inner {
Some(ref mut service) => {
let _ = try_ready!(service.poll_ready());
}
None => panic!("called `poll` after future completed"),
}
Ok(self.inner.take().unwrap().into())
}
}
impl<F, R, E, S> NewService for F
where F: Fn() -> R,
R: IntoFuture<Item = S, Error = E>,
@ -230,29 +293,11 @@ impl<S: Service + ?Sized> Service for Box<S> {
type Error = S::Error;
type Future = S::Future;
fn call(&self, request: S::Request) -> S::Future {
(**self).call(request)
}
}
impl<S: Service + ?Sized> Service for Rc<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, request: S::Request) -> S::Future {
(**self).call(request)
}
}
impl<S: Service + ?Sized> Service for Arc<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, request: S::Request) -> S::Future {
fn poll_ready(&mut self) -> Poll<(), S::Error> {
(**self).poll_ready()
}
fn call(&mut self, request: S::Request) -> S::Future {
(**self).call(request)
}
}