Reorganize tower-timeout (#191)

- Change error bound: `Error: From<S::Error>` (for `?` operator)
- Split types into dedicated files
- Move `ResponseFuture` into public `future` mod.
This commit is contained in:
Carl Lerche 2019-03-11 16:10:51 -07:00 committed by GitHub
parent 720d31c65f
commit 50fc5e8e63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 92 deletions

View File

@ -0,0 +1,17 @@
//! Error types
use std::{error, fmt};
pub(crate) type Error = Box<error::Error + Send + Sync>;
/// The timeout elapsed.
#[derive(Debug)]
pub struct Elapsed(pub(super) ());
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("request timed out")
}
}
impl error::Error for Elapsed {}

View File

@ -0,0 +1,41 @@
//! Future types
use crate::error::{Elapsed, Error};
use futures::{Async, Future, Poll};
use tokio_timer::Delay;
/// `Timeout` response future
#[derive(Debug)]
pub struct ResponseFuture<T> {
response: T,
sleep: Delay,
}
impl<T> ResponseFuture<T> {
pub(crate) fn new(response: T, sleep: Delay) -> Self {
ResponseFuture { response, sleep }
}
}
impl<T> Future for ResponseFuture<T>
where
T: Future,
Error: From<T::Error>,
{
type Item = T::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.response.poll()? {
Async::Ready(v) => return Ok(Async::Ready(v)),
Async::NotReady => {}
}
// Now check the sleep
match self.sleep.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(_) => Err(Elapsed(()).into()),
}
}
}

View File

@ -0,0 +1,32 @@
use crate::{Error, Timeout};
use std::time::Duration;
use tower_layer::Layer;
use tower_service::Service;
/// Applies a timeout to requests via the supplied inner service.
#[derive(Debug)]
pub struct TimeoutLayer {
timeout: Duration,
}
impl TimeoutLayer {
/// Create a timeout from a duration
pub fn new(timeout: Duration) -> Self {
TimeoutLayer { timeout }
}
}
impl<S, Request> Layer<S, Request> for TimeoutLayer
where
S: Service<Request>,
Error: From<S::Error>,
{
type Response = S::Response;
type Error = Error;
type LayerError = ();
type Service = Timeout<S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Ok(Timeout::new(service, self.timeout))
}
}

View File

@ -12,17 +12,21 @@ extern crate tokio_timer;
extern crate tower_layer;
extern crate tower_service;
use futures::{Async, Future, Poll};
pub mod error;
pub mod future;
mod layer;
pub use crate::layer::TimeoutLayer;
use crate::error::Error;
use crate::future::ResponseFuture;
use futures::Poll;
use tokio_timer::{clock, Delay};
use tower_layer::Layer;
use tower_service::Service;
use std::time::Duration;
use self::error::Elapsed;
type Error = Box<::std::error::Error + Send + Sync>;
/// Applies a timeout to requests.
#[derive(Debug, Clone)]
pub struct Timeout<T> {
@ -30,43 +34,6 @@ pub struct Timeout<T> {
timeout: Duration,
}
/// Applies a timeout to requests via the supplied inner service.
#[derive(Debug)]
pub struct TimeoutLayer {
timeout: Duration,
}
/// `Timeout` response future
#[derive(Debug)]
pub struct ResponseFuture<T> {
response: T,
sleep: Delay,
}
// ===== impl TimeoutLayer =====
impl TimeoutLayer {
/// Create a timeout from a duration
pub fn new(timeout: Duration) -> Self {
TimeoutLayer { timeout }
}
}
impl<S, Request> Layer<S, Request> for TimeoutLayer
where
S: Service<Request>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type LayerError = ();
type Service = Timeout<S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Ok(Timeout::new(service, self.timeout))
}
}
// ===== impl Timeout =====
impl<T> Timeout<T> {
@ -79,7 +46,7 @@ impl<T> Timeout<T> {
impl<S, Request> Service<Request> for Timeout<S>
where
S: Service<Request>,
S::Error: Into<Error>,
Error: From<S::Error>,
{
type Response = S::Response;
type Error = Error;
@ -90,53 +57,9 @@ where
}
fn call(&mut self, request: Request) -> Self::Future {
ResponseFuture {
response: self.inner.call(request),
sleep: Delay::new(clock::now() + self.timeout),
}
let response = self.inner.call(request);
let sleep = Delay::new(clock::now() + self.timeout);
ResponseFuture::new(response, sleep)
}
}
// ===== impl ResponseFuture =====
impl<T> Future for ResponseFuture<T>
where
T: Future,
T::Error: Into<Error>,
{
type Item = T::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.response.poll().map_err(Into::into)? {
Async::Ready(v) => return Ok(Async::Ready(v)),
Async::NotReady => {}
}
// Now check the sleep
match self.sleep.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(_) => Err(Elapsed(()).into()),
}
}
}
// ===== impl Error =====
/// Timeout error types
pub mod error {
use std::{error::Error, fmt};
/// The timeout elapsed.
#[derive(Debug)]
pub struct Elapsed(pub(super) ());
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("request timed out")
}
}
impl Error for Elapsed {}
}