diff --git a/tower-buffer/tests/buffer.rs b/tower-buffer/tests/buffer.rs index 12945c6..5bd5348 100644 --- a/tower-buffer/tests/buffer.rs +++ b/tower-buffer/tests/buffer.rs @@ -7,7 +7,6 @@ use futures::prelude::*; use tower_buffer::*; use tower_service::*; -use std::fmt; use std::thread; #[test] @@ -90,7 +89,7 @@ fn when_inner_fails() { // Make the service NotReady handle.allow(0); - handle.error(Error("foobar")); + handle.error("foobar"); let mut res1 = service.call("hello"); @@ -101,39 +100,17 @@ fn when_inner_fails() { if let Some(e) = e.downcast_ref::() { assert!(format!("{}", e).contains("poll_ready")); - let e = e - .source() - .expect("nope 1") - .downcast_ref::>() - .expect("nope 1_2"); + let e = e.source().unwrap(); - match e { - tower_mock::Error::Other(e) => assert_eq!(e.0, "foobar"), - _ => panic!("unexpected mock error"), - } + assert_eq!(e.to_string(), "foobar"); } else { panic!("unexpected error type: {:?}", e); } }); } -#[derive(Debug)] -struct Error(&'static str); - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(fmt) - } -} - -impl ::std::error::Error for Error { - fn source(&self) -> Option<&(::std::error::Error + 'static)> { - None - } -} - -type Mock = tower_mock::Mock<&'static str, &'static str, Error>; -type Handle = tower_mock::Handle<&'static str, &'static str, Error>; +type Mock = tower_mock::Mock<&'static str, &'static str>; +type Handle = tower_mock::Handle<&'static str, &'static str>; struct Exec; diff --git a/tower-filter/tests/filter.rs b/tower-filter/tests/filter.rs index a599487..a1cdb2d 100644 --- a/tower-filter/tests/filter.rs +++ b/tower-filter/tests/filter.rs @@ -117,8 +117,8 @@ fn saturate() { th2.join().unwrap(); } -type Mock = tower_mock::Mock; -type Handle = tower_mock::Handle; +type Mock = tower_mock::Mock; +type Handle = tower_mock::Handle; fn new_service(max: usize, f: F) -> (Filter, Handle) where diff --git a/tower-in-flight-limit/tests/in_flight_limit.rs b/tower-in-flight-limit/tests/in_flight_limit.rs index 0bc3d50..312b4d5 100644 --- a/tower-in-flight-limit/tests/in_flight_limit.rs +++ b/tower-in-flight-limit/tests/in_flight_limit.rs @@ -208,7 +208,7 @@ fn response_error_releases_capacity() { // s1 sends the request, then s2 is able to get capacity let r1 = s1.call("hello"); let request = handle.next_request().unwrap(); - request.error(()); + request.error("boom"); r1.wait().unwrap_err(); @@ -244,8 +244,8 @@ fn response_future_drop_releases_capacity() { }); } -type Mock = tower_mock::Mock<&'static str, &'static str, ()>; -type Handle = tower_mock::Handle<&'static str, &'static str, ()>; +type Mock = tower_mock::Mock<&'static str, &'static str>; +type Handle = tower_mock::Handle<&'static str, &'static str>; fn new_service(max: usize) -> (InFlightLimit, Handle) { let (service, handle) = Mock::new(); diff --git a/tower-mock/Cargo.toml b/tower-mock/Cargo.toml index 6a1efc0..beb5df3 100644 --- a/tower-mock/Cargo.toml +++ b/tower-mock/Cargo.toml @@ -6,4 +6,5 @@ publish = false [dependencies] futures = "0.1" +tokio-sync = "0.1.3" tower-service = { version = "0.2", path = "../tower-service" } diff --git a/tower-mock/src/error.rs b/tower-mock/src/error.rs new file mode 100644 index 0000000..1fb3064 --- /dev/null +++ b/tower-mock/src/error.rs @@ -0,0 +1,23 @@ +//! Error types + +use std::error; +use std::fmt; + +pub(crate) type Error = Box; + +#[derive(Debug)] +pub struct Closed(()); + +impl Closed { + pub(crate) fn new() -> Closed { + Closed(()) + } +} + +impl fmt::Display for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "service closed") + } +} + +impl error::Error for Closed {} diff --git a/tower-mock/src/future.rs b/tower-mock/src/future.rs new file mode 100644 index 0000000..0061196 --- /dev/null +++ b/tower-mock/src/future.rs @@ -0,0 +1,40 @@ +//! Future types + +use error::{self, Error}; +use futures::{Async, Future, Poll}; +use tokio_sync::oneshot; + +/// Future of the `Mock` response. +#[derive(Debug)] +pub struct ResponseFuture { + rx: Option>, +} + +type Rx = oneshot::Receiver>; + +impl ResponseFuture { + pub(crate) fn new(rx: Rx) -> ResponseFuture { + ResponseFuture { rx: Some(rx) } + } + + pub(crate) fn closed() -> ResponseFuture { + ResponseFuture { rx: None } + } +} + +impl Future for ResponseFuture { + type Item = T; + type Error = Error; + + fn poll(&mut self) -> Poll { + match self.rx { + Some(ref mut rx) => match rx.poll() { + Ok(Async::Ready(Ok(v))) => Ok(v.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => Err(error::Closed::new().into()), + }, + None => Err(error::Closed::new().into()), + } + } +} diff --git a/tower-mock/src/lib.rs b/tower-mock/src/lib.rs index 6cfe2fe..7e26985 100644 --- a/tower-mock/src/lib.rs +++ b/tower-mock/src/lib.rs @@ -1,88 +1,78 @@ //! Mock `Service` that can be used in tests. extern crate futures; +extern crate tokio_sync; extern crate tower_service; -use tower_service::Service; +pub mod error; +pub mod future; -use futures::sync::{mpsc, oneshot}; +use error::Error; +use future::ResponseFuture; use futures::task::{self, Task}; use futures::{Async, Future, Poll, Stream}; +use tokio_sync::{mpsc, oneshot}; +use tower_service::Service; use std::collections::HashMap; -use std::fmt; use std::sync::{Arc, Mutex}; use std::{ops, u64}; /// A mock service #[derive(Debug)] -pub struct Mock { +pub struct Mock { id: u64, - tx: Mutex>, - state: Arc>>, + tx: Mutex>, + state: Arc>, can_send: bool, } /// Handle to the `Mock`. #[derive(Debug)] -pub struct Handle { - rx: Rx, - state: Arc>>, +pub struct Handle { + rx: Rx, + state: Arc>, } #[derive(Debug)] -pub struct Request { +pub struct Request { request: T, - respond: Respond, + respond: Respond, } /// Respond to a request received by `Mock`. #[derive(Debug)] -pub struct Respond { - tx: oneshot::Sender>, -} - -/// Future of the `Mock` response. -#[derive(Debug)] -pub struct ResponseFuture { - // Slight abuse of the error enum... - rx: Error>>, -} - -/// Enumeration of errors that can be returned by `Mock`. -#[derive(Debug, PartialEq)] -pub enum Error { - Closed, - Other(T), +pub struct Respond { + tx: oneshot::Sender>, } #[derive(Debug)] -struct State { - // Tracks the number of requests that can be sent through +struct State { + /// Tracks the number of requests that can be sent through rem: u64, - // Tasks that are blocked + /// Tasks that are blocked tasks: HashMap, - // Tracks if the `Handle` dropped + /// Tracks if the `Handle` dropped is_closed: bool, - // Tracks the ID for the next mock clone + /// Tracks the ID for the next mock clone next_clone_id: u64, - // Tracks the next error to yield (if any) - err_with: Option, + /// Tracks the next error to yield (if any) + err_with: Option, } -type Tx = mpsc::UnboundedSender>; -type Rx = mpsc::UnboundedReceiver>; +type Tx = mpsc::UnboundedSender>; +type Rx = mpsc::UnboundedReceiver>; // ===== impl Mock ===== -impl Mock { +impl Mock { /// Create a new `Mock` and `Handle` pair. - pub fn new() -> (Self, Handle) { - let (tx, rx) = mpsc::unbounded(); + pub fn new() -> (Self, Handle) { + let (tx, rx) = mpsc::unbounded_channel(); let tx = Mutex::new(tx); let state = Arc::new(Mutex::new(State::new())); @@ -100,16 +90,16 @@ impl Mock { } } -impl Service for Mock { +impl Service for Mock { type Response = U; - type Error = Error; - type Future = ResponseFuture; + type Error = Error; + type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { let mut state = self.state.lock().unwrap(); if state.is_closed { - return Err(Error::Closed); + return Err(error::Closed::new().into()); } if self.can_send { @@ -117,7 +107,7 @@ impl Service for Mock { } if let Some(e) = state.err_with.take() { - return Err(Error::Other(e)); + return Err(e); } if state.rem > 0 { @@ -143,7 +133,7 @@ impl Service for Mock { let mut state = self.state.lock().unwrap(); if state.is_closed { - return ResponseFuture { rx: Error::Closed }; + return ResponseFuture::closed(); } if !self.can_send { @@ -166,21 +156,19 @@ impl Service for Mock { respond: Respond { tx }, }; - match self.tx.lock().unwrap().unbounded_send(request) { + match self.tx.lock().unwrap().try_send(request) { Ok(_) => {} Err(_) => { // TODO: Can this be reached - return ResponseFuture { rx: Error::Closed }; + return ResponseFuture::closed(); } } - ResponseFuture { - rx: Error::Other(rx), - } + ResponseFuture::new(rx) } } -impl Clone for Mock { +impl Clone for Mock { fn clone(&self) -> Self { let id = { let mut state = self.state.lock().unwrap(); @@ -202,7 +190,7 @@ impl Clone for Mock { } } -impl Drop for Mock { +impl Drop for Mock { fn drop(&mut self) { let mut state = match self.state.lock() { Ok(v) => v, @@ -221,16 +209,16 @@ impl Drop for Mock { // ===== impl Handle ===== -impl Handle { +impl Handle { /// Asynchronously gets the next request - pub fn poll_request(&mut self) -> Poll>, ()> { - self.rx.poll() + pub fn poll_request(&mut self) -> Poll>, Error> { + self.rx.poll().map_err(Into::into) } /// Synchronously gets the next request. /// /// This function blocks the current thread until a request is received. - pub fn next_request(&mut self) -> Option> { + pub fn next_request(&mut self) -> Option> { use futures::future::poll_fn; poll_fn(|| self.poll_request()).wait().unwrap() } @@ -248,9 +236,9 @@ impl Handle { } /// Make the next poll_ method error with the given error. - pub fn error(&mut self, e: E) { + pub fn error>(&mut self, e: E) { let mut state = self.state.lock().unwrap(); - state.err_with = Some(e); + state.err_with = Some(e.into()); for (_, task) in state.tasks.drain() { task.notify(); @@ -258,7 +246,7 @@ impl Handle { } } -impl Drop for Handle { +impl Drop for Handle { fn drop(&mut self) { let mut state = match self.state.lock() { Ok(v) => v, @@ -281,9 +269,9 @@ impl Drop for Handle { // ===== impl Request ===== -impl Request { +impl Request { /// Split the request and respond handle - pub fn into_parts(self) -> (T, Respond) { + pub fn into_parts(self) -> (T, Respond) { (self.request, self.respond) } @@ -291,12 +279,12 @@ impl Request { self.respond.respond(response) } - pub fn error(self, err: E) { + pub fn error>(self, err: E) { self.respond.error(err) } } -impl ops::Deref for Request { +impl ops::Deref for Request { type Target = T; fn deref(&self) -> &T { @@ -306,67 +294,22 @@ impl ops::Deref for Request { // ===== impl Respond ===== -impl Respond { +impl Respond { pub fn respond(self, response: T) { // TODO: Should the result be dropped? let _ = self.tx.send(Ok(response)); } - pub fn error(self, err: E) { + pub fn error>(self, err: E) { // TODO: Should the result be dropped? - let _ = self.tx.send(Err(err)); - } -} - -// ===== impl ResponseFuture ===== - -impl Future for ResponseFuture { - type Item = T; - type Error = Error; - - fn poll(&mut self) -> Poll { - match self.rx { - Error::Other(ref mut rx) => match rx.poll() { - Ok(Async::Ready(Ok(v))) => Ok(v.into()), - Ok(Async::Ready(Err(e))) => Err(Error::Other(e)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err(Error::Closed), - }, - Error::Closed => Err(Error::Closed), - } - } -} - -// ===== impl Error ===== - -impl fmt::Display for Error -where - T: fmt::Display, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Closed => write!(fmt, "mock service is closed"), - Error::Other(e) => e.fmt(fmt), - } - } -} - -impl std::error::Error for Error -where - T: std::error::Error + 'static, -{ - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - Error::Closed => None, - Error::Other(e) => Some(e), - } + let _ = self.tx.send(Err(err.into())); } } // ===== impl State ===== -impl State { - fn new() -> State { +impl State { + fn new() -> State { State { rem: u64::MAX, tasks: HashMap::new(), diff --git a/tower-mock/tests/mock.rs b/tower-mock/tests/mock.rs index 0bd0b7f..f40eff3 100644 --- a/tower-mock/tests/mock.rs +++ b/tower-mock/tests/mock.rs @@ -50,8 +50,8 @@ fn backpressure() { mock.call("hello?".into()); } -type Mock = tower_mock::Mock; -type Handle = tower_mock::Handle; +type Mock = tower_mock::Mock; +type Handle = tower_mock::Handle; fn new_mock() -> (Mock, Handle) { Mock::new() diff --git a/tower-rate-limit/tests/rate_limit.rs b/tower-rate-limit/tests/rate_limit.rs index 6696011..4982ab6 100644 --- a/tower-rate-limit/tests/rate_limit.rs +++ b/tower-rate-limit/tests/rate_limit.rs @@ -54,8 +54,8 @@ fn reaching_capacity() { assert_eq!(response.unwrap(), "done"); } -type Mock = tower_mock::Mock<&'static str, &'static str, ()>; -type Handle = tower_mock::Handle<&'static str, &'static str, ()>; +type Mock = tower_mock::Mock<&'static str, &'static str>; +type Handle = tower_mock::Handle<&'static str, &'static str>; fn new_service(rate: Rate) -> (RateLimit, Handle) { let (service, handle) = Mock::new(); diff --git a/tower-retry/tests/retry.rs b/tower-retry/tests/retry.rs index 60089b5..688ec27 100644 --- a/tower-retry/tests/retry.rs +++ b/tower-retry/tests/retry.rs @@ -48,7 +48,7 @@ fn retry_limit() { assert_eq!(*req3, "hello"); req3.error("retry 3"); - assert_eq!(fut.wait().unwrap_err(), tower_mock::Error::Other("retry 3")); + assert_eq!(fut.wait().unwrap_err().to_string(), "retry 3"); } #[test] @@ -66,7 +66,7 @@ fn retry_error_inspection() { let req2 = handle.next_request().unwrap(); assert_eq!(*req2, "hello"); req2.error("reject"); - assert_eq!(fut.wait().unwrap_err(), tower_mock::Error::Other("reject")); + assert_eq!(fut.wait().unwrap_err().to_string(), "reject"); } #[test] @@ -79,7 +79,7 @@ fn retry_cannot_clone_request() { assert_eq!(*req1, "hello"); req1.error("retry 1"); - assert_eq!(fut.wait().unwrap_err(), tower_mock::Error::Other("retry 1")); + assert_eq!(fut.wait().unwrap_err().to_string(), "retry 1"); } #[test] @@ -100,9 +100,9 @@ fn success_with_cannot_clone() { type Req = &'static str; type Res = &'static str; type InnerError = &'static str; -type Error = tower_mock::Error; -type Mock = tower_mock::Mock; -type Handle = tower_mock::Handle; +type Error = Box<::std::error::Error + Send + Sync>; +type Mock = tower_mock::Mock; +type Handle = tower_mock::Handle; #[derive(Clone)] struct RetryErrors; @@ -147,7 +147,7 @@ impl Policy for UnlessErr { type Future = future::FutureResult; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { result.err().and_then(|err| { - if err != &tower_mock::Error::Other(self.0) { + if err.to_string() != self.0 { Some(future::ok(self.clone())) } else { None