diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index f961638..b2876d3 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -9,6 +9,7 @@ //! good thing to put in production situations. However, it illustrates the idea //! and capabilities around adding buffering to an arbitrary `Service`. +#[macro_use] extern crate futures; extern crate tower_service; @@ -21,7 +22,7 @@ use tower_service::Service; use std::{error, fmt}; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering; /// Adds a buffer in front of an inner service. /// @@ -53,8 +54,9 @@ pub enum Error { pub struct Worker where T: Service, { - service: T, + current_message: Option>, rx: UnboundedReceiver>, + service: T, state: Arc, } @@ -99,8 +101,9 @@ where T: Service, }); let worker = Worker { - service, + current_message: None, rx, + service, state: state.clone(), }; @@ -125,7 +128,7 @@ where T: Service, fn poll_ready(&mut self) -> Poll<(), Self::Error> { // If the inner service has errored, then we error here. - if !self.state.open.load(SeqCst) { + if !self.state.open.load(Ordering::Acquire) { return Err(Error::Closed); } else { // Ideally we could query if the `mpsc` is closed, but this is not @@ -137,12 +140,15 @@ where T: Service, fn call(&mut self, request: Self::Request) -> Self::Future { let (tx, rx) = oneshot::channel(); - // TODO: This could fail too - let _ = self.tx.unbounded_send(Message { + let sent = self.tx.unbounded_send(Message { request, tx, }); + if sent.is_err() { + self.state.open.store(false, Ordering::Release); + } + ResponseFuture { state: ResponseState::Rx(rx) } } } @@ -181,6 +187,32 @@ where T: Service // ===== impl Worker ===== +impl Worker +where T: Service +{ + /// Return the next queued Message that hasn't been canceled. + fn poll_next_msg(&mut self) -> Poll>, ()> { + if let Some(mut msg) = self.current_message.take() { + // poll_cancel returns Async::Ready is the receiver is dropped. + // Returning NotReady means it is still alive, so we should still + // use it. + if msg.tx.poll_cancel()?.is_not_ready() { + return Ok(Async::Ready(Some(msg))); + } + } + + // Get the next request + while let Some(mut msg) = try_ready!(self.rx.poll()) { + if msg.tx.poll_cancel()?.is_not_ready() { + return Ok(Async::Ready(Some(msg))); + } + // Otherwise, request is canceled, so pop the next one. + } + + Ok(Async::Ready(None)) + } +} + impl Future for Worker where T: Service, { @@ -188,39 +220,33 @@ where T: Service, type Error = (); fn poll(&mut self) -> Poll<(), ()> { - loop { + while let Some(msg) = try_ready!(self.poll_next_msg()) { // Wait for the service to be ready match self.service.poll_ready() { - Ok(Async::Ready(_)) => {} + Ok(Async::Ready(())) => { + let response = self.service.call(msg.request); + + // Send the response future back to the sender. + // + // An error means the request had been canceled in-between + // our calls, the response future will just be dropped. + let _ = msg.tx.send(response); + } Ok(Async::NotReady) => { + // Put out current message back in its slot. + self.current_message = Some(msg); return Ok(Async::NotReady); } Err(_) => { - self.state.open.store(false, SeqCst); + self.state.open.store(false, Ordering::Release); return Ok(().into()) } } - // Get the next request - match self.rx.poll() { - Ok(Async::Ready(Some(Message { request, tx }))) => { - // Received a request. Dispatch the request to the inner service - // and get the response future - let response = self.service.call(request); - - // Send the response future back to the sender. - // - // TODO: how should send errors be handled? - let _ = tx.send(response); - } - Ok(Async::Ready(None)) => { - // All senders are dropped... the task is no longer needed - return Ok(().into()); - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(()) => unreachable!(), - } } + + // All senders are dropped... the task is no longer needed + Ok(().into()) } } @@ -252,7 +278,7 @@ where fn description(&self) -> &str { match *self { - Error::Inner(_) => "inner service error", + Error::Inner(ref e) => e.description(), Error::Closed => "buffer closed", } } diff --git a/tower-buffer/tests/buffer.rs b/tower-buffer/tests/buffer.rs new file mode 100644 index 0000000..fd1c3ca --- /dev/null +++ b/tower-buffer/tests/buffer.rs @@ -0,0 +1,82 @@ +extern crate futures; +extern crate tower_buffer; +extern crate tower_mock; +extern crate tower_service; + +use futures::prelude::*; +use tower_buffer::*; +use tower_service::*; + +use std::thread; + +#[test] +fn req_and_res() { + let (mut service, mut handle) = new_service(); + + let response = service.call("hello"); + + let request = handle.next_request().unwrap(); + assert_eq!(*request, "hello"); + request.respond("world"); + + assert_eq!(response.wait().unwrap(), "world"); +} + +#[test] +fn clears_canceled_requests() { + let (mut service, mut handle) = new_service(); + + handle.allow(1); + + let res1 = service.call("hello"); + + let req1 = handle.next_request().unwrap(); + assert_eq!(*req1, "hello"); + + // don't respond yet, new requests will get buffered + + let res2 = service.call("hello2"); + with_task(|| { + assert!(handle.poll_request().unwrap().is_not_ready()); + }); + + let res3 = service.call("hello3"); + + drop(res2); + + req1.respond("world"); + assert_eq!(res1.wait().unwrap(), "world"); + + // res2 was dropped, so it should have been canceled in the buffer + handle.allow(1); + + let req3 = handle.next_request().unwrap(); + assert_eq!(*req3, "hello3"); + req3.respond("world3"); + assert_eq!(res3.wait().unwrap(), "world3"); +} + +type Mock = tower_mock::Mock<&'static str, &'static str, ()>; +type Handle = tower_mock::Handle<&'static str, &'static str, ()>; + +struct Exec; + +impl futures::future::Executor> for Exec { + fn execute(&self, fut: Worker) -> Result<(), futures::future::ExecuteError>> { + thread::spawn(move || { + fut.wait().unwrap(); + }); + Ok(()) + } +} + +fn new_service() -> (Buffer, Handle) { + let (service, handle) = Mock::new(); + let service = Buffer::new(service, &Exec).unwrap(); + (service, handle) +} + +fn with_task U, U>(f: F) -> U { + use futures::future::{Future, lazy}; + lazy(|| Ok::<_, ()>(f())).wait().unwrap() +}