diff --git a/tower-timeout/src/error.rs b/tower-timeout/src/error.rs new file mode 100644 index 0000000..6853297 --- /dev/null +++ b/tower-timeout/src/error.rs @@ -0,0 +1,17 @@ +//! Error types + +use std::{error, fmt}; + +pub(crate) type Error = Box; + +/// The timeout elapsed. +#[derive(Debug)] +pub struct Elapsed(pub(super) ()); + +impl fmt::Display for Elapsed { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("request timed out") + } +} + +impl error::Error for Elapsed {} diff --git a/tower-timeout/src/future.rs b/tower-timeout/src/future.rs new file mode 100644 index 0000000..712e12e --- /dev/null +++ b/tower-timeout/src/future.rs @@ -0,0 +1,41 @@ +//! Future types + +use crate::error::{Elapsed, Error}; +use futures::{Async, Future, Poll}; +use tokio_timer::Delay; + +/// `Timeout` response future +#[derive(Debug)] +pub struct ResponseFuture { + response: T, + sleep: Delay, +} + +impl ResponseFuture { + pub(crate) fn new(response: T, sleep: Delay) -> Self { + ResponseFuture { response, sleep } + } +} + +impl Future for ResponseFuture +where + T: Future, + Error: From, +{ + type Item = T::Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + // First, try polling the future + match self.response.poll()? { + Async::Ready(v) => return Ok(Async::Ready(v)), + Async::NotReady => {} + } + + // Now check the sleep + match self.sleep.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(_) => Err(Elapsed(()).into()), + } + } +} diff --git a/tower-timeout/src/layer.rs b/tower-timeout/src/layer.rs new file mode 100644 index 0000000..4ac3c1d --- /dev/null +++ b/tower-timeout/src/layer.rs @@ -0,0 +1,32 @@ +use crate::{Error, Timeout}; +use std::time::Duration; +use tower_layer::Layer; +use tower_service::Service; + +/// Applies a timeout to requests via the supplied inner service. +#[derive(Debug)] +pub struct TimeoutLayer { + timeout: Duration, +} + +impl TimeoutLayer { + /// Create a timeout from a duration + pub fn new(timeout: Duration) -> Self { + TimeoutLayer { timeout } + } +} + +impl Layer for TimeoutLayer +where + S: Service, + Error: From, +{ + type Response = S::Response; + type Error = Error; + type LayerError = (); + type Service = Timeout; + + fn layer(&self, service: S) -> Result { + Ok(Timeout::new(service, self.timeout)) + } +} diff --git a/tower-timeout/src/lib.rs b/tower-timeout/src/lib.rs index 7df3aa8..7ea883c 100644 --- a/tower-timeout/src/lib.rs +++ b/tower-timeout/src/lib.rs @@ -12,17 +12,21 @@ extern crate tokio_timer; extern crate tower_layer; extern crate tower_service; -use futures::{Async, Future, Poll}; +pub mod error; +pub mod future; +mod layer; + +pub use crate::layer::TimeoutLayer; + +use crate::error::Error; +use crate::future::ResponseFuture; +use futures::Poll; use tokio_timer::{clock, Delay}; -use tower_layer::Layer; + use tower_service::Service; use std::time::Duration; -use self::error::Elapsed; - -type Error = Box<::std::error::Error + Send + Sync>; - /// Applies a timeout to requests. #[derive(Debug, Clone)] pub struct Timeout { @@ -30,43 +34,6 @@ pub struct Timeout { timeout: Duration, } -/// Applies a timeout to requests via the supplied inner service. -#[derive(Debug)] -pub struct TimeoutLayer { - timeout: Duration, -} - -/// `Timeout` response future -#[derive(Debug)] -pub struct ResponseFuture { - response: T, - sleep: Delay, -} - -// ===== impl TimeoutLayer ===== - -impl TimeoutLayer { - /// Create a timeout from a duration - pub fn new(timeout: Duration) -> Self { - TimeoutLayer { timeout } - } -} - -impl Layer for TimeoutLayer -where - S: Service, - S::Error: Into, -{ - type Response = S::Response; - type Error = Error; - type LayerError = (); - type Service = Timeout; - - fn layer(&self, service: S) -> Result { - Ok(Timeout::new(service, self.timeout)) - } -} - // ===== impl Timeout ===== impl Timeout { @@ -79,7 +46,7 @@ impl Timeout { impl Service for Timeout where S: Service, - S::Error: Into, + Error: From, { type Response = S::Response; type Error = Error; @@ -90,53 +57,9 @@ where } fn call(&mut self, request: Request) -> Self::Future { - ResponseFuture { - response: self.inner.call(request), - sleep: Delay::new(clock::now() + self.timeout), - } + let response = self.inner.call(request); + let sleep = Delay::new(clock::now() + self.timeout); + + ResponseFuture::new(response, sleep) } } - -// ===== impl ResponseFuture ===== - -impl Future for ResponseFuture -where - T: Future, - T::Error: Into, -{ - type Item = T::Item; - type Error = Error; - - fn poll(&mut self) -> Poll { - // First, try polling the future - match self.response.poll().map_err(Into::into)? { - Async::Ready(v) => return Ok(Async::Ready(v)), - Async::NotReady => {} - } - - // Now check the sleep - match self.sleep.poll()? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(_) => Err(Elapsed(()).into()), - } - } -} - -// ===== impl Error ===== - -/// Timeout error types -pub mod error { - use std::{error::Error, fmt}; - - /// The timeout elapsed. - #[derive(Debug)] - pub struct Elapsed(pub(super) ()); - - impl fmt::Display for Elapsed { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("request timed out") - } - } - - impl Error for Elapsed {} -}