buffer: check for canceled requests before polling inner service (#72)

This commit is contained in:
Sean McArthur 2018-05-14 10:06:06 -07:00 committed by GitHub
parent 295ae583d4
commit 6cdc8d0ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 137 additions and 29 deletions

View File

@ -9,6 +9,7 @@
//! good thing to put in production situations. However, it illustrates the idea
//! and capabilities around adding buffering to an arbitrary `Service`.
#[macro_use]
extern crate futures;
extern crate tower_service;
@ -21,7 +22,7 @@ use tower_service::Service;
use std::{error, fmt};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::Ordering;
/// Adds a buffer in front of an inner service.
///
@ -53,8 +54,9 @@ pub enum Error<T> {
pub struct Worker<T>
where T: Service,
{
service: T,
current_message: Option<Message<T>>,
rx: UnboundedReceiver<Message<T>>,
service: T,
state: Arc<State>,
}
@ -99,8 +101,9 @@ where T: Service,
});
let worker = Worker {
service,
current_message: None,
rx,
service,
state: state.clone(),
};
@ -125,7 +128,7 @@ where T: Service,
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// If the inner service has errored, then we error here.
if !self.state.open.load(SeqCst) {
if !self.state.open.load(Ordering::Acquire) {
return Err(Error::Closed);
} else {
// Ideally we could query if the `mpsc` is closed, but this is not
@ -137,12 +140,15 @@ where T: Service,
fn call(&mut self, request: Self::Request) -> Self::Future {
let (tx, rx) = oneshot::channel();
// TODO: This could fail too
let _ = self.tx.unbounded_send(Message {
let sent = self.tx.unbounded_send(Message {
request,
tx,
});
if sent.is_err() {
self.state.open.store(false, Ordering::Release);
}
ResponseFuture { state: ResponseState::Rx(rx) }
}
}
@ -181,6 +187,32 @@ where T: Service
// ===== impl Worker =====
impl<T> Worker<T>
where T: Service
{
/// Return the next queued Message that hasn't been canceled.
fn poll_next_msg(&mut self) -> Poll<Option<Message<T>>, ()> {
if let Some(mut msg) = self.current_message.take() {
// poll_cancel returns Async::Ready is the receiver is dropped.
// Returning NotReady means it is still alive, so we should still
// use it.
if msg.tx.poll_cancel()?.is_not_ready() {
return Ok(Async::Ready(Some(msg)));
}
}
// Get the next request
while let Some(mut msg) = try_ready!(self.rx.poll()) {
if msg.tx.poll_cancel()?.is_not_ready() {
return Ok(Async::Ready(Some(msg)));
}
// Otherwise, request is canceled, so pop the next one.
}
Ok(Async::Ready(None))
}
}
impl<T> Future for Worker<T>
where T: Service,
{
@ -188,39 +220,33 @@ where T: Service,
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
while let Some(msg) = try_ready!(self.poll_next_msg()) {
// Wait for the service to be ready
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {}
Ok(Async::Ready(())) => {
let response = self.service.call(msg.request);
// Send the response future back to the sender.
//
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
let _ = msg.tx.send(response);
}
Ok(Async::NotReady) => {
// Put out current message back in its slot.
self.current_message = Some(msg);
return Ok(Async::NotReady);
}
Err(_) => {
self.state.open.store(false, SeqCst);
self.state.open.store(false, Ordering::Release);
return Ok(().into())
}
}
// Get the next request
match self.rx.poll() {
Ok(Async::Ready(Some(Message { request, tx }))) => {
// Received a request. Dispatch the request to the inner service
// and get the response future
let response = self.service.call(request);
// Send the response future back to the sender.
//
// TODO: how should send errors be handled?
let _ = tx.send(response);
}
Ok(Async::Ready(None)) => {
// All senders are dropped... the task is no longer needed
return Ok(().into());
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
}
}
// All senders are dropped... the task is no longer needed
Ok(().into())
}
}
@ -252,7 +278,7 @@ where
fn description(&self) -> &str {
match *self {
Error::Inner(_) => "inner service error",
Error::Inner(ref e) => e.description(),
Error::Closed => "buffer closed",
}
}

View File

@ -0,0 +1,82 @@
extern crate futures;
extern crate tower_buffer;
extern crate tower_mock;
extern crate tower_service;
use futures::prelude::*;
use tower_buffer::*;
use tower_service::*;
use std::thread;
#[test]
fn req_and_res() {
let (mut service, mut handle) = new_service();
let response = service.call("hello");
let request = handle.next_request().unwrap();
assert_eq!(*request, "hello");
request.respond("world");
assert_eq!(response.wait().unwrap(), "world");
}
#[test]
fn clears_canceled_requests() {
let (mut service, mut handle) = new_service();
handle.allow(1);
let res1 = service.call("hello");
let req1 = handle.next_request().unwrap();
assert_eq!(*req1, "hello");
// don't respond yet, new requests will get buffered
let res2 = service.call("hello2");
with_task(|| {
assert!(handle.poll_request().unwrap().is_not_ready());
});
let res3 = service.call("hello3");
drop(res2);
req1.respond("world");
assert_eq!(res1.wait().unwrap(), "world");
// res2 was dropped, so it should have been canceled in the buffer
handle.allow(1);
let req3 = handle.next_request().unwrap();
assert_eq!(*req3, "hello3");
req3.respond("world3");
assert_eq!(res3.wait().unwrap(), "world3");
}
type Mock = tower_mock::Mock<&'static str, &'static str, ()>;
type Handle = tower_mock::Handle<&'static str, &'static str, ()>;
struct Exec;
impl futures::future::Executor<Worker<Mock>> for Exec {
fn execute(&self, fut: Worker<Mock>) -> Result<(), futures::future::ExecuteError<Worker<Mock>>> {
thread::spawn(move || {
fut.wait().unwrap();
});
Ok(())
}
}
fn new_service() -> (Buffer<Mock>, Handle) {
let (service, handle) = Mock::new();
let service = Buffer::new(service, &Exec).unwrap();
(service, handle)
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::{Future, lazy};
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}