parent
749e46b3f5
commit
bf8c3b885a
|
@ -7,7 +7,6 @@ use futures::prelude::*;
|
|||
use tower_buffer::*;
|
||||
use tower_service::*;
|
||||
|
||||
use std::fmt;
|
||||
use std::thread;
|
||||
|
||||
#[test]
|
||||
|
@ -90,7 +89,7 @@ fn when_inner_fails() {
|
|||
|
||||
// Make the service NotReady
|
||||
handle.allow(0);
|
||||
handle.error(Error("foobar"));
|
||||
handle.error("foobar");
|
||||
|
||||
let mut res1 = service.call("hello");
|
||||
|
||||
|
@ -101,39 +100,17 @@ fn when_inner_fails() {
|
|||
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
|
||||
assert!(format!("{}", e).contains("poll_ready"));
|
||||
|
||||
let e = e
|
||||
.source()
|
||||
.expect("nope 1")
|
||||
.downcast_ref::<tower_mock::Error<Error>>()
|
||||
.expect("nope 1_2");
|
||||
let e = e.source().unwrap();
|
||||
|
||||
match e {
|
||||
tower_mock::Error::Other(e) => assert_eq!(e.0, "foobar"),
|
||||
_ => panic!("unexpected mock error"),
|
||||
}
|
||||
assert_eq!(e.to_string(), "foobar");
|
||||
} else {
|
||||
panic!("unexpected error type: {:?}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Error(&'static str);
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
self.0.fmt(fmt)
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::error::Error for Error {
|
||||
fn source(&self) -> Option<&(::std::error::Error + 'static)> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
type Mock = tower_mock::Mock<&'static str, &'static str, Error>;
|
||||
type Handle = tower_mock::Handle<&'static str, &'static str, Error>;
|
||||
type Mock = tower_mock::Mock<&'static str, &'static str>;
|
||||
type Handle = tower_mock::Handle<&'static str, &'static str>;
|
||||
|
||||
struct Exec;
|
||||
|
||||
|
|
|
@ -117,8 +117,8 @@ fn saturate() {
|
|||
th2.join().unwrap();
|
||||
}
|
||||
|
||||
type Mock = tower_mock::Mock<String, String, ()>;
|
||||
type Handle = tower_mock::Handle<String, String, ()>;
|
||||
type Mock = tower_mock::Mock<String, String>;
|
||||
type Handle = tower_mock::Handle<String, String>;
|
||||
|
||||
fn new_service<F, U>(max: usize, f: F) -> (Filter<Mock, F>, Handle)
|
||||
where
|
||||
|
|
|
@ -208,7 +208,7 @@ fn response_error_releases_capacity() {
|
|||
// s1 sends the request, then s2 is able to get capacity
|
||||
let r1 = s1.call("hello");
|
||||
let request = handle.next_request().unwrap();
|
||||
request.error(());
|
||||
request.error("boom");
|
||||
|
||||
r1.wait().unwrap_err();
|
||||
|
||||
|
@ -244,8 +244,8 @@ fn response_future_drop_releases_capacity() {
|
|||
});
|
||||
}
|
||||
|
||||
type Mock = tower_mock::Mock<&'static str, &'static str, ()>;
|
||||
type Handle = tower_mock::Handle<&'static str, &'static str, ()>;
|
||||
type Mock = tower_mock::Mock<&'static str, &'static str>;
|
||||
type Handle = tower_mock::Handle<&'static str, &'static str>;
|
||||
|
||||
fn new_service(max: usize) -> (InFlightLimit<Mock>, Handle) {
|
||||
let (service, handle) = Mock::new();
|
||||
|
|
|
@ -6,4 +6,5 @@ publish = false
|
|||
|
||||
[dependencies]
|
||||
futures = "0.1"
|
||||
tokio-sync = "0.1.3"
|
||||
tower-service = { version = "0.2", path = "../tower-service" }
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
//! Error types
|
||||
|
||||
use std::error;
|
||||
use std::fmt;
|
||||
|
||||
pub(crate) type Error = Box<error::Error + Send + Sync>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Closed(());
|
||||
|
||||
impl Closed {
|
||||
pub(crate) fn new() -> Closed {
|
||||
Closed(())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Closed {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "service closed")
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for Closed {}
|
|
@ -0,0 +1,40 @@
|
|||
//! Future types
|
||||
|
||||
use error::{self, Error};
|
||||
use futures::{Async, Future, Poll};
|
||||
use tokio_sync::oneshot;
|
||||
|
||||
/// Future of the `Mock` response.
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseFuture<T> {
|
||||
rx: Option<Rx<T>>,
|
||||
}
|
||||
|
||||
type Rx<T> = oneshot::Receiver<Result<T, Error>>;
|
||||
|
||||
impl<T> ResponseFuture<T> {
|
||||
pub(crate) fn new(rx: Rx<T>) -> ResponseFuture<T> {
|
||||
ResponseFuture { rx: Some(rx) }
|
||||
}
|
||||
|
||||
pub(crate) fn closed() -> ResponseFuture<T> {
|
||||
ResponseFuture { rx: None }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for ResponseFuture<T> {
|
||||
type Item = T;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.rx {
|
||||
Some(ref mut rx) => match rx.poll() {
|
||||
Ok(Async::Ready(Ok(v))) => Ok(v.into()),
|
||||
Ok(Async::Ready(Err(e))) => Err(e),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(_) => Err(error::Closed::new().into()),
|
||||
},
|
||||
None => Err(error::Closed::new().into()),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,88 +1,78 @@
|
|||
//! Mock `Service` that can be used in tests.
|
||||
|
||||
extern crate futures;
|
||||
extern crate tokio_sync;
|
||||
extern crate tower_service;
|
||||
|
||||
use tower_service::Service;
|
||||
pub mod error;
|
||||
pub mod future;
|
||||
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
use error::Error;
|
||||
use future::ResponseFuture;
|
||||
use futures::task::{self, Task};
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use tokio_sync::{mpsc, oneshot};
|
||||
use tower_service::Service;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{ops, u64};
|
||||
|
||||
/// A mock service
|
||||
#[derive(Debug)]
|
||||
pub struct Mock<T, U, E> {
|
||||
pub struct Mock<T, U> {
|
||||
id: u64,
|
||||
tx: Mutex<Tx<T, U, E>>,
|
||||
state: Arc<Mutex<State<E>>>,
|
||||
tx: Mutex<Tx<T, U>>,
|
||||
state: Arc<Mutex<State>>,
|
||||
can_send: bool,
|
||||
}
|
||||
|
||||
/// Handle to the `Mock`.
|
||||
#[derive(Debug)]
|
||||
pub struct Handle<T, U, E> {
|
||||
rx: Rx<T, U, E>,
|
||||
state: Arc<Mutex<State<E>>>,
|
||||
pub struct Handle<T, U> {
|
||||
rx: Rx<T, U>,
|
||||
state: Arc<Mutex<State>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Request<T, U, E> {
|
||||
pub struct Request<T, U> {
|
||||
request: T,
|
||||
respond: Respond<U, E>,
|
||||
respond: Respond<U>,
|
||||
}
|
||||
|
||||
/// Respond to a request received by `Mock`.
|
||||
#[derive(Debug)]
|
||||
pub struct Respond<T, E> {
|
||||
tx: oneshot::Sender<Result<T, E>>,
|
||||
}
|
||||
|
||||
/// Future of the `Mock` response.
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseFuture<T, E> {
|
||||
// Slight abuse of the error enum...
|
||||
rx: Error<oneshot::Receiver<Result<T, E>>>,
|
||||
}
|
||||
|
||||
/// Enumeration of errors that can be returned by `Mock`.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error<T> {
|
||||
Closed,
|
||||
Other(T),
|
||||
pub struct Respond<T> {
|
||||
tx: oneshot::Sender<Result<T, Error>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct State<E> {
|
||||
// Tracks the number of requests that can be sent through
|
||||
struct State {
|
||||
/// Tracks the number of requests that can be sent through
|
||||
rem: u64,
|
||||
|
||||
// Tasks that are blocked
|
||||
/// Tasks that are blocked
|
||||
tasks: HashMap<u64, Task>,
|
||||
|
||||
// Tracks if the `Handle` dropped
|
||||
/// Tracks if the `Handle` dropped
|
||||
is_closed: bool,
|
||||
|
||||
// Tracks the ID for the next mock clone
|
||||
/// Tracks the ID for the next mock clone
|
||||
next_clone_id: u64,
|
||||
|
||||
// Tracks the next error to yield (if any)
|
||||
err_with: Option<E>,
|
||||
/// Tracks the next error to yield (if any)
|
||||
err_with: Option<Error>,
|
||||
}
|
||||
|
||||
type Tx<T, U, E> = mpsc::UnboundedSender<Request<T, U, E>>;
|
||||
type Rx<T, U, E> = mpsc::UnboundedReceiver<Request<T, U, E>>;
|
||||
type Tx<T, U> = mpsc::UnboundedSender<Request<T, U>>;
|
||||
type Rx<T, U> = mpsc::UnboundedReceiver<Request<T, U>>;
|
||||
|
||||
// ===== impl Mock =====
|
||||
|
||||
impl<T, U, E> Mock<T, U, E> {
|
||||
impl<T, U> Mock<T, U> {
|
||||
/// Create a new `Mock` and `Handle` pair.
|
||||
pub fn new() -> (Self, Handle<T, U, E>) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
pub fn new() -> (Self, Handle<T, U>) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let tx = Mutex::new(tx);
|
||||
|
||||
let state = Arc::new(Mutex::new(State::new()));
|
||||
|
@ -100,16 +90,16 @@ impl<T, U, E> Mock<T, U, E> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Service<T> for Mock<T, U, E> {
|
||||
impl<T, U> Service<T> for Mock<T, U> {
|
||||
type Response = U;
|
||||
type Error = Error<E>;
|
||||
type Future = ResponseFuture<U, E>;
|
||||
type Error = Error;
|
||||
type Future = ResponseFuture<U>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
if state.is_closed {
|
||||
return Err(Error::Closed);
|
||||
return Err(error::Closed::new().into());
|
||||
}
|
||||
|
||||
if self.can_send {
|
||||
|
@ -117,7 +107,7 @@ impl<T, U, E> Service<T> for Mock<T, U, E> {
|
|||
}
|
||||
|
||||
if let Some(e) = state.err_with.take() {
|
||||
return Err(Error::Other(e));
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if state.rem > 0 {
|
||||
|
@ -143,7 +133,7 @@ impl<T, U, E> Service<T> for Mock<T, U, E> {
|
|||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
if state.is_closed {
|
||||
return ResponseFuture { rx: Error::Closed };
|
||||
return ResponseFuture::closed();
|
||||
}
|
||||
|
||||
if !self.can_send {
|
||||
|
@ -166,21 +156,19 @@ impl<T, U, E> Service<T> for Mock<T, U, E> {
|
|||
respond: Respond { tx },
|
||||
};
|
||||
|
||||
match self.tx.lock().unwrap().unbounded_send(request) {
|
||||
match self.tx.lock().unwrap().try_send(request) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
// TODO: Can this be reached
|
||||
return ResponseFuture { rx: Error::Closed };
|
||||
return ResponseFuture::closed();
|
||||
}
|
||||
}
|
||||
|
||||
ResponseFuture {
|
||||
rx: Error::Other(rx),
|
||||
}
|
||||
ResponseFuture::new(rx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Clone for Mock<T, U, E> {
|
||||
impl<T, U> Clone for Mock<T, U> {
|
||||
fn clone(&self) -> Self {
|
||||
let id = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
@ -202,7 +190,7 @@ impl<T, U, E> Clone for Mock<T, U, E> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Drop for Mock<T, U, E> {
|
||||
impl<T, U> Drop for Mock<T, U> {
|
||||
fn drop(&mut self) {
|
||||
let mut state = match self.state.lock() {
|
||||
Ok(v) => v,
|
||||
|
@ -221,16 +209,16 @@ impl<T, U, E> Drop for Mock<T, U, E> {
|
|||
|
||||
// ===== impl Handle =====
|
||||
|
||||
impl<T, U, E> Handle<T, U, E> {
|
||||
impl<T, U> Handle<T, U> {
|
||||
/// Asynchronously gets the next request
|
||||
pub fn poll_request(&mut self) -> Poll<Option<Request<T, U, E>>, ()> {
|
||||
self.rx.poll()
|
||||
pub fn poll_request(&mut self) -> Poll<Option<Request<T, U>>, Error> {
|
||||
self.rx.poll().map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Synchronously gets the next request.
|
||||
///
|
||||
/// This function blocks the current thread until a request is received.
|
||||
pub fn next_request(&mut self) -> Option<Request<T, U, E>> {
|
||||
pub fn next_request(&mut self) -> Option<Request<T, U>> {
|
||||
use futures::future::poll_fn;
|
||||
poll_fn(|| self.poll_request()).wait().unwrap()
|
||||
}
|
||||
|
@ -248,9 +236,9 @@ impl<T, U, E> Handle<T, U, E> {
|
|||
}
|
||||
|
||||
/// Make the next poll_ method error with the given error.
|
||||
pub fn error(&mut self, e: E) {
|
||||
pub fn error<E: Into<Error>>(&mut self, e: E) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.err_with = Some(e);
|
||||
state.err_with = Some(e.into());
|
||||
|
||||
for (_, task) in state.tasks.drain() {
|
||||
task.notify();
|
||||
|
@ -258,7 +246,7 @@ impl<T, U, E> Handle<T, U, E> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Drop for Handle<T, U, E> {
|
||||
impl<T, U> Drop for Handle<T, U> {
|
||||
fn drop(&mut self) {
|
||||
let mut state = match self.state.lock() {
|
||||
Ok(v) => v,
|
||||
|
@ -281,9 +269,9 @@ impl<T, U, E> Drop for Handle<T, U, E> {
|
|||
|
||||
// ===== impl Request =====
|
||||
|
||||
impl<T, U, E> Request<T, U, E> {
|
||||
impl<T, U> Request<T, U> {
|
||||
/// Split the request and respond handle
|
||||
pub fn into_parts(self) -> (T, Respond<U, E>) {
|
||||
pub fn into_parts(self) -> (T, Respond<U>) {
|
||||
(self.request, self.respond)
|
||||
}
|
||||
|
||||
|
@ -291,12 +279,12 @@ impl<T, U, E> Request<T, U, E> {
|
|||
self.respond.respond(response)
|
||||
}
|
||||
|
||||
pub fn error(self, err: E) {
|
||||
pub fn error<E: Into<Error>>(self, err: E) {
|
||||
self.respond.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> ops::Deref for Request<T, U, E> {
|
||||
impl<T, U> ops::Deref for Request<T, U> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
|
@ -306,67 +294,22 @@ impl<T, U, E> ops::Deref for Request<T, U, E> {
|
|||
|
||||
// ===== impl Respond =====
|
||||
|
||||
impl<T, E> Respond<T, E> {
|
||||
impl<T> Respond<T> {
|
||||
pub fn respond(self, response: T) {
|
||||
// TODO: Should the result be dropped?
|
||||
let _ = self.tx.send(Ok(response));
|
||||
}
|
||||
|
||||
pub fn error(self, err: E) {
|
||||
pub fn error<E: Into<Error>>(self, err: E) {
|
||||
// TODO: Should the result be dropped?
|
||||
let _ = self.tx.send(Err(err));
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl ResponseFuture =====
|
||||
|
||||
impl<T, E> Future for ResponseFuture<T, E> {
|
||||
type Item = T;
|
||||
type Error = Error<E>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.rx {
|
||||
Error::Other(ref mut rx) => match rx.poll() {
|
||||
Ok(Async::Ready(Ok(v))) => Ok(v.into()),
|
||||
Ok(Async::Ready(Err(e))) => Err(Error::Other(e)),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(_) => Err(Error::Closed),
|
||||
},
|
||||
Error::Closed => Err(Error::Closed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Error =====
|
||||
|
||||
impl<T> fmt::Display for Error<T>
|
||||
where
|
||||
T: fmt::Display,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Error::Closed => write!(fmt, "mock service is closed"),
|
||||
Error::Other(e) => e.fmt(fmt),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::error::Error for Error<T>
|
||||
where
|
||||
T: std::error::Error + 'static,
|
||||
{
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
match self {
|
||||
Error::Closed => None,
|
||||
Error::Other(e) => Some(e),
|
||||
}
|
||||
let _ = self.tx.send(Err(err.into()));
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl State =====
|
||||
|
||||
impl<E> State<E> {
|
||||
fn new() -> State<E> {
|
||||
impl State {
|
||||
fn new() -> State {
|
||||
State {
|
||||
rem: u64::MAX,
|
||||
tasks: HashMap::new(),
|
||||
|
|
|
@ -50,8 +50,8 @@ fn backpressure() {
|
|||
mock.call("hello?".into());
|
||||
}
|
||||
|
||||
type Mock = tower_mock::Mock<String, String, ()>;
|
||||
type Handle = tower_mock::Handle<String, String, ()>;
|
||||
type Mock = tower_mock::Mock<String, String>;
|
||||
type Handle = tower_mock::Handle<String, String>;
|
||||
|
||||
fn new_mock() -> (Mock, Handle) {
|
||||
Mock::new()
|
||||
|
|
|
@ -54,8 +54,8 @@ fn reaching_capacity() {
|
|||
assert_eq!(response.unwrap(), "done");
|
||||
}
|
||||
|
||||
type Mock = tower_mock::Mock<&'static str, &'static str, ()>;
|
||||
type Handle = tower_mock::Handle<&'static str, &'static str, ()>;
|
||||
type Mock = tower_mock::Mock<&'static str, &'static str>;
|
||||
type Handle = tower_mock::Handle<&'static str, &'static str>;
|
||||
|
||||
fn new_service(rate: Rate) -> (RateLimit<Mock>, Handle) {
|
||||
let (service, handle) = Mock::new();
|
||||
|
|
|
@ -48,7 +48,7 @@ fn retry_limit() {
|
|||
assert_eq!(*req3, "hello");
|
||||
req3.error("retry 3");
|
||||
|
||||
assert_eq!(fut.wait().unwrap_err(), tower_mock::Error::Other("retry 3"));
|
||||
assert_eq!(fut.wait().unwrap_err().to_string(), "retry 3");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -66,7 +66,7 @@ fn retry_error_inspection() {
|
|||
let req2 = handle.next_request().unwrap();
|
||||
assert_eq!(*req2, "hello");
|
||||
req2.error("reject");
|
||||
assert_eq!(fut.wait().unwrap_err(), tower_mock::Error::Other("reject"));
|
||||
assert_eq!(fut.wait().unwrap_err().to_string(), "reject");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -79,7 +79,7 @@ fn retry_cannot_clone_request() {
|
|||
assert_eq!(*req1, "hello");
|
||||
req1.error("retry 1");
|
||||
|
||||
assert_eq!(fut.wait().unwrap_err(), tower_mock::Error::Other("retry 1"));
|
||||
assert_eq!(fut.wait().unwrap_err().to_string(), "retry 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -100,9 +100,9 @@ fn success_with_cannot_clone() {
|
|||
type Req = &'static str;
|
||||
type Res = &'static str;
|
||||
type InnerError = &'static str;
|
||||
type Error = tower_mock::Error<InnerError>;
|
||||
type Mock = tower_mock::Mock<Req, Res, InnerError>;
|
||||
type Handle = tower_mock::Handle<Req, Res, InnerError>;
|
||||
type Error = Box<::std::error::Error + Send + Sync>;
|
||||
type Mock = tower_mock::Mock<Req, Res>;
|
||||
type Handle = tower_mock::Handle<Req, Res>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RetryErrors;
|
||||
|
@ -147,7 +147,7 @@ impl Policy<Req, Res, Error> for UnlessErr {
|
|||
type Future = future::FutureResult<Self, ()>;
|
||||
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
|
||||
result.err().and_then(|err| {
|
||||
if err != &tower_mock::Error::Other(self.0) {
|
||||
if err.to_string() != self.0 {
|
||||
Some(future::ok(self.clone()))
|
||||
} else {
|
||||
None
|
||||
|
|
Loading…
Reference in New Issue