From 720d31c65ff2ec9c2531dc7f5c46bc0456847295 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 8 Mar 2019 22:44:48 -0800 Subject: [PATCH] rate-limit: Refresh layout (#189) - Switch to `Box` - Break up lib.rs into multiple files. - Use `tokio::clock::now` instead of `Instant::now`. --- tower-rate-limit/src/error.rs | 18 +++++ tower-rate-limit/src/future.rs | 26 ++++++ tower-rate-limit/src/layer.rs | 32 ++++++++ tower-rate-limit/src/lib.rs | 144 ++++++--------------------------- tower-rate-limit/src/rate.rs | 29 +++++++ 5 files changed, 128 insertions(+), 121 deletions(-) create mode 100644 tower-rate-limit/src/error.rs create mode 100644 tower-rate-limit/src/future.rs create mode 100644 tower-rate-limit/src/layer.rs create mode 100644 tower-rate-limit/src/rate.rs diff --git a/tower-rate-limit/src/error.rs b/tower-rate-limit/src/error.rs new file mode 100644 index 0000000..aa2eaa9 --- /dev/null +++ b/tower-rate-limit/src/error.rs @@ -0,0 +1,18 @@ +use std::error; + +pub(crate) type Error = Box; + +pub(crate) mod never { + use std::{error, fmt}; + + #[derive(Debug)] + pub enum Never {} + + impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + unreachable!(); + } + } + + impl error::Error for Never {} +} diff --git a/tower-rate-limit/src/future.rs b/tower-rate-limit/src/future.rs new file mode 100644 index 0000000..0104614 --- /dev/null +++ b/tower-rate-limit/src/future.rs @@ -0,0 +1,26 @@ +use crate::Error; +use futures::{Future, Poll}; + +#[derive(Debug)] +pub struct ResponseFuture { + inner: T, +} + +impl ResponseFuture { + pub(crate) fn new(inner: T) -> ResponseFuture { + ResponseFuture { inner } + } +} + +impl Future for ResponseFuture +where + T: Future, + Error: From, +{ + type Item = T::Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + self.inner.poll().map_err(Into::into) + } +} diff --git a/tower-rate-limit/src/layer.rs b/tower-rate-limit/src/layer.rs new file mode 100644 index 0000000..8443e22 --- /dev/null +++ b/tower-rate-limit/src/layer.rs @@ -0,0 +1,32 @@ +use crate::error::{never::Never, Error}; +use crate::{Rate, RateLimit}; +use std::time::Duration; +use tower_layer::Layer; +use tower_service::Service; + +#[derive(Debug)] +pub struct RateLimitLayer { + rate: Rate, +} + +impl RateLimitLayer { + pub fn new(num: u64, per: Duration) -> Self { + let rate = Rate::new(num, per); + RateLimitLayer { rate } + } +} + +impl Layer for RateLimitLayer +where + S: Service, + Error: From, +{ + type Response = S::Response; + type Error = Error; + type LayerError = Never; + type Service = RateLimit; + + fn layer(&self, service: S) -> Result { + Ok(RateLimit::new(service, self.rate)) + } +} diff --git a/tower-rate-limit/src/lib.rs b/tower-rate-limit/src/lib.rs index a3f8a01..f8d2e31 100644 --- a/tower-rate-limit/src/lib.rs +++ b/tower-rate-limit/src/lib.rs @@ -7,13 +7,21 @@ extern crate tokio_timer; extern crate tower_layer; extern crate tower_service; +pub mod error; +pub mod future; +mod layer; +mod rate; + +pub use crate::layer::RateLimitLayer; +pub use crate::rate::Rate; + +use crate::error::Error; +use crate::future::ResponseFuture; use futures::{Future, Poll}; -use tokio_timer::Delay; -use tower_layer::Layer; +use tokio_timer::{clock, Delay}; use tower_service::Service; -use std::time::{Duration, Instant}; -use std::{error, fmt}; +use std::time::Instant; #[derive(Debug)] pub struct RateLimit { @@ -22,30 +30,6 @@ pub struct RateLimit { state: State, } -#[derive(Debug)] -pub struct RateLimitLayer { - rate: Rate, -} - -#[derive(Debug, Copy, Clone)] -pub struct Rate { - num: u64, - per: Duration, -} - -/// The request has been rate limited -/// -/// TODO: Consider returning the original request -#[derive(Debug)] -pub enum Error { - RateLimit, - Upstream(T), -} - -pub struct ResponseFuture { - inner: T, -} - #[derive(Debug)] enum State { // The service has hit its limit @@ -53,27 +37,6 @@ enum State { Ready { until: Instant, rem: u64 }, } -impl RateLimitLayer { - pub fn new(num: u64, per: Duration) -> Self { - let rate = Rate { num, per }; - RateLimitLayer { rate } - } -} - -impl Layer for RateLimitLayer -where - S: Service, -{ - type Response = S::Response; - type Error = Error; - type LayerError = (); - type Service = RateLimit; - - fn layer(&self, service: S) -> Result { - Ok(RateLimit::new(service, self.rate)) - } -} - impl RateLimit { /// Create a new rate limiter pub fn new(inner: T, rate: Rate) -> Self @@ -81,8 +44,8 @@ impl RateLimit { T: Service, { let state = State::Ready { - until: Instant::now(), - rem: rate.num, + until: clock::now(), + rem: rate.num(), }; RateLimit { @@ -108,41 +71,26 @@ impl RateLimit { } } -impl Rate { - /// Create a new rate - /// - /// # Panics - /// - /// This function panics if `num` or `per` is 0. - pub fn new(num: u64, per: Duration) -> Self { - assert!(num > 0); - assert!(per > Duration::from_millis(0)); - - Rate { num, per } - } -} - impl Service for RateLimit where S: Service, + Error: From, { type Response = S::Response; - type Error = Error; + type Error = Error; type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { match self.state { State::Ready { .. } => return Ok(().into()), State::Limited(ref mut sleep) => { - let res = sleep.poll().map_err(|_| Error::RateLimit); - - try_ready!(res); + try_ready!(sleep.poll()); } } self.state = State::Ready { - until: Instant::now() + self.rate.per, - rem: self.rate.num, + until: clock::now() + self.rate.per(), + rem: self.rate.num(), }; Ok(().into()) @@ -151,12 +99,12 @@ where fn call(&mut self, request: Request) -> Self::Future { match self.state { State::Ready { mut until, mut rem } => { - let now = Instant::now(); + let now = clock::now(); // If the period has elapsed, reset it. if now >= until { - until = now + self.rate.per; - let rem = self.rate.num; + until = now + self.rate.per(); + let rem = self.rate.num(); self.state = State::Ready { until, rem } } @@ -172,55 +120,9 @@ where // Call the inner future let inner = self.inner.call(request); - ResponseFuture { inner } + ResponseFuture::new(inner) } State::Limited(..) => panic!("service not ready; poll_ready must be called first"), } } } - -impl Future for ResponseFuture -where - T: Future, -{ - type Item = T::Item; - type Error = Error; - - fn poll(&mut self) -> Poll { - self.inner.poll().map_err(Error::Upstream) - } -} - -// ===== impl Error ===== - -impl fmt::Display for Error -where - T: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Error::Upstream(ref why) => fmt::Display::fmt(why, f), - Error::RateLimit => f.pad("rate limit exceeded"), - } - } -} - -impl error::Error for Error -where - T: error::Error, -{ - fn cause(&self) -> Option<&error::Error> { - if let Error::Upstream(ref why) = *self { - Some(why) - } else { - None - } - } - - fn description(&self) -> &str { - match *self { - Error::Upstream(_) => "upstream service error", - Error::RateLimit => "rate limit exceeded", - } - } -} diff --git a/tower-rate-limit/src/rate.rs b/tower-rate-limit/src/rate.rs new file mode 100644 index 0000000..9e57e36 --- /dev/null +++ b/tower-rate-limit/src/rate.rs @@ -0,0 +1,29 @@ +use std::time::Duration; + +#[derive(Debug, Copy, Clone)] +pub struct Rate { + num: u64, + per: Duration, +} + +impl Rate { + /// Create a new rate + /// + /// # Panics + /// + /// This function panics if `num` or `per` is 0. + pub fn new(num: u64, per: Duration) -> Self { + assert!(num > 0); + assert!(per > Duration::from_millis(0)); + + Rate { num, per } + } + + pub(crate) fn num(&self) -> u64 { + self.num + } + + pub(crate) fn per(&self) -> Duration { + self.per + } +}