Update tower-limit and prepare for release (#375)

* wip

* Refactor limit tests and prep for release
This commit is contained in:
Lucio Franco 2019-12-04 09:53:52 -05:00 committed by GitHub
parent ec6215fb2f
commit 877c194b1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 242 additions and 263 deletions

View File

@ -8,7 +8,7 @@ members = [
# "tower-filter",
# "tower-hedge",
"tower-layer",
# "tower-limit",
"tower-limit",
# "tower-load",
# "tower-load-shed",
# "tower-ready-cache",

View File

@ -8,13 +8,13 @@ name = "tower-limit"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.2"
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-limit/0.3.0-alpha.2"
documentation = "https://docs.rs/tower-limit/0.3.0"
description = """
Limit maximum request rate to a `Service`.
"""
@ -22,14 +22,14 @@ categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core-preview = "=0.3.0-alpha.19"
tower-service = { version = "=0.3.0-alpha.2", path = "../tower-service" }
tower-layer = { version = "=0.3.0-alpha.2", path = "../tower-layer" }
tokio-sync = "=0.2.0-alpha.6"
tokio-timer = "=0.3.0-alpha.6"
futures-core = "0.3"
tower-service = { version = "0.3", path = "../tower-service" }
tower-layer = { version = "0.3", path = "../tower-layer" }
tokio-sync = "0.2.0-alpha.6"
tokio = { version = "0.2", features = ["time"] }
pin-project = "0.4"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.2", path = "../tower-test" }
tokio-test = "=0.2.0-alpha.6"
futures-util-preview = "=0.3.0-alpha.19"
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = "0.2"
tokio = { version = "0.2", features = ["macros", "test-util"] }

View File

@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0-alpha.2")]
#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,

View File

@ -5,11 +5,9 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio_timer::{clock, Delay};
use tokio::time::{Delay, Instant};
use tower_service::Service;
use std::time::Instant;
/// Enforces a rate limit on the number of requests the underlying
/// service can handle over a period of time.
#[derive(Debug)]
@ -30,7 +28,7 @@ impl<T> RateLimit<T> {
/// Create a new rate limiter
pub fn new(inner: T, rate: Rate) -> Self {
let state = State::Ready {
until: clock::now(),
until: Instant::now(),
rem: rate.num(),
};
@ -74,7 +72,7 @@ where
}
self.state = State::Ready {
until: clock::now() + self.rate.per(),
until: Instant::now() + self.rate.per(),
rem: self.rate.num(),
};
@ -84,7 +82,7 @@ where
fn call(&mut self, request: Request) -> Self::Future {
match self.state {
State::Ready { mut until, mut rem } => {
let now = clock::now();
let now = Instant::now();
// If the period has elapsed, reset it.
if now >= until {
@ -99,7 +97,7 @@ where
self.state = State::Ready { until, rem };
} else {
// The service is disabled until further notice
let sleep = tokio_timer::delay(until);
let sleep = tokio::time::delay_until(until);
self.state = State::Limited(sleep);
}

View File

@ -1,27 +1,21 @@
use futures_util::{future::poll_fn, pin_mut};
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, block_on, task::MockTask};
use tower_limit::concurrency::ConcurrencyLimit;
use tower_service::Service;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
use tower_limit::concurrency::ConcurrencyLimitLayer;
use tower_test::{assert_request_eq, mock};
#[test]
fn basic_service_limit_functionality_with_poll_ready() {
let mut task = MockTask::new();
#[tokio::test]
async fn basic_service_limit_functionality_with_poll_ready() {
let limit = ConcurrencyLimitLayer::new(2);
let (mut service, mut handle) = mock::spawn_layer(limit);
let (mut service, handle) = new_service(2);
pin_mut!(handle);
block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap();
assert_ready_ok!(service.poll_ready());
let r1 = service.call("hello 1");
block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap();
assert_ready_ok!(service.poll_ready());
let r2 = service.call("hello 2");
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert_pending!(service.poll_ready());
assert!(!task.is_woken());
assert!(!service.is_woken());
// The request gets passed through
assert_request_eq!(handle, "hello 1").send_response("world 1");
@ -30,232 +24,184 @@ fn basic_service_limit_functionality_with_poll_ready() {
assert_request_eq!(handle, "hello 2").send_response("world 2");
// There are no more requests
task.enter(|cx| {
assert_pending!(handle.as_mut().poll_request(cx));
});
assert_pending!(handle.poll_request());
assert_eq!(block_on(r1).unwrap(), "world 1");
assert!(task.is_woken());
assert_eq!(r1.await.unwrap(), "world 1");
assert!(service.is_woken());
// Another request can be sent
task.enter(|cx| {
assert_ready_ok!(service.poll_ready(cx));
});
assert_ready_ok!(service.poll_ready());
let r3 = service.call("hello 3");
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert_pending!(service.poll_ready());
assert_eq!(block_on(r2).unwrap(), "world 2");
assert_eq!(r2.await.unwrap(), "world 2");
// The request gets passed through
assert_request_eq!(handle, "hello 3").send_response("world 3");
assert_eq!(block_on(r3).unwrap(), "world 3");
assert_eq!(r3.await.unwrap(), "world 3");
}
#[test]
fn basic_service_limit_functionality_without_poll_ready() {
let mut task = MockTask::new();
#[tokio::test]
async fn basic_service_limit_functionality_without_poll_ready() {
let limit = ConcurrencyLimitLayer::new(2);
let (mut service, mut handle) = mock::spawn_layer(limit);
let (mut service, handle) = new_service(2);
pin_mut!(handle);
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
assert_ready_ok!(service.poll_ready());
let r1 = service.call("hello 1");
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
assert_ready_ok!(service.poll_ready());
let r2 = service.call("hello 2");
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_pending!(service.poll_ready());
// The request gets passed through
assert_request_eq!(handle, "hello 1").send_response("world 1");
assert!(!task.is_woken());
assert!(!service.is_woken());
// The next request gets passed through
assert_request_eq!(handle, "hello 2").send_response("world 2");
assert!(!task.is_woken());
assert!(!service.is_woken());
// There are no more requests
assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx)));
assert_pending!(handle.poll_request());
assert_eq!(block_on(r1).unwrap(), "world 1");
assert_eq!(r1.await.unwrap(), "world 1");
assert!(task.is_woken());
assert!(service.is_woken());
// One more request can be sent
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
assert_ready_ok!(service.poll_ready());
let r4 = service.call("hello 4");
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_pending!(service.poll_ready());
assert_eq!(block_on(r2).unwrap(), "world 2");
assert!(task.is_woken());
assert_eq!(r2.await.unwrap(), "world 2");
assert!(service.is_woken());
// The request gets passed through
assert_request_eq!(handle, "hello 4").send_response("world 4");
assert_eq!(block_on(r4).unwrap(), "world 4");
assert_eq!(r4.await.unwrap(), "world 4");
}
#[test]
fn request_without_capacity() {
let mut task = MockTask::new();
#[tokio::test]
async fn request_without_capacity() {
let limit = ConcurrencyLimitLayer::new(0);
let (mut service, _) = mock::spawn_layer::<(), (), _>(limit);
let (mut service, _) = new_service(0);
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert_pending!(service.poll_ready());
}
#[test]
fn reserve_capacity_without_sending_request() {
let mut task = MockTask::new();
let (mut s1, handle) = new_service(1);
pin_mut!(handle);
#[tokio::test]
async fn reserve_capacity_without_sending_request() {
let limit = ConcurrencyLimitLayer::new(1);
let (mut s1, mut handle) = mock::spawn_layer(limit);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
assert_ready_ok!(s1.poll_ready());
// Service 2 cannot get capacity
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
assert_pending!(s2.poll_ready());
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
assert_request_eq!(handle, "hello").send_response("world");
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
assert_pending!(s2.poll_ready());
block_on(r1).unwrap();
r1.await.unwrap();
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
assert_ready_ok!(s2.poll_ready());
}
#[test]
fn service_drop_frees_capacity() {
let mut task = MockTask::new();
let (mut s1, _handle) = new_service(1);
#[tokio::test]
async fn service_drop_frees_capacity() {
let limit = ConcurrencyLimitLayer::new(1);
let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
let mut s2 = s1.clone();
// Reserve capacity in s1
assert_ready_ok!(task.enter(|cx| s1.poll_ready(cx)));
assert_ready_ok!(s1.poll_ready());
// Service 2 cannot get capacity
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
assert_pending!(s2.poll_ready());
drop(s1);
assert!(task.is_woken());
assert_ready_ok!(task.enter(|cx| s2.poll_ready(cx)));
assert!(s2.is_woken());
assert_ready_ok!(s2.poll_ready());
}
#[test]
fn response_error_releases_capacity() {
let mut task = MockTask::new();
let (mut s1, handle) = new_service(1);
pin_mut!(handle);
#[tokio::test]
async fn response_error_releases_capacity() {
let limit = ConcurrencyLimitLayer::new(1);
let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
assert_ready_ok!(s1.poll_ready());
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
assert_request_eq!(handle, "hello").send_error("boom");
block_on(r1).unwrap_err();
r1.await.unwrap_err();
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
assert_ready_ok!(s2.poll_ready());
}
#[test]
fn response_future_drop_releases_capacity() {
let mut task = MockTask::new();
let (mut s1, _handle) = new_service(1);
#[tokio::test]
async fn response_future_drop_releases_capacity() {
let limit = ConcurrencyLimitLayer::new(1);
let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
assert_ready_ok!(s1.poll_ready());
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
assert_pending!(s2.poll_ready());
drop(r1);
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
assert_ready_ok!(s2.poll_ready());
}
#[test]
fn multi_waiters() {
let mut task1 = MockTask::new();
let mut task2 = MockTask::new();
let mut task3 = MockTask::new();
let (mut s1, _handle) = new_service(1);
#[tokio::test]
async fn multi_waiters() {
let limit = ConcurrencyLimitLayer::new(1);
let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
let mut s2 = s1.clone();
let mut s3 = s1.clone();
// Reserve capacity in s1
task1.enter(|cx| assert_ready_ok!(s1.poll_ready(cx)));
assert_ready_ok!(s1.poll_ready());
// s2 and s3 are not ready
task2.enter(|cx| assert_pending!(s2.poll_ready(cx)));
task3.enter(|cx| assert_pending!(s3.poll_ready(cx)));
assert_pending!(s2.poll_ready());
assert_pending!(s3.poll_ready());
drop(s1);
assert!(task2.is_woken());
assert!(!task3.is_woken());
assert!(s2.is_woken());
assert!(!s3.is_woken());
drop(s2);
assert!(task3.is_woken());
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
fn new_service(max: usize) -> (ConcurrencyLimit<Mock>, Handle) {
let (service, handle) = mock::pair();
let service = ConcurrencyLimit::new(service, max);
(service, handle)
assert!(s3.is_woken());
}

View File

@ -1,52 +1,35 @@
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task::MockTask};
use tower_limit::rate::*;
use tower_service::*;
use std::time::Duration;
use tokio::time;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
use tower_limit::rate::RateLimitLayer;
use tower_test::{assert_request_eq, mock};
use std::future::Future;
use std::time::Duration;
#[tokio::test]
async fn reaching_capacity() {
time::pause();
#[test]
fn reaching_capacity() {
clock::mock(|time| {
let mut task = MockTask::new();
let rate_limit = RateLimitLayer::new(1, Duration::from_millis(100));
let (mut service, handle) = new_service(Rate::new(1, from_millis(100)));
pin_mut!(handle);
let (mut service, mut handle) = mock::spawn_layer(rate_limit);
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let response = service.call("hello");
pin_mut!(response);
assert_ready_ok!(service.poll_ready());
assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(task.enter(|cx| response.poll(cx)), "world");
let response = service.call("hello");
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx)));
assert_request_eq!(handle, "hello").send_response("world");
time.advance(Duration::from_millis(100));
assert_eq!(response.await.unwrap(), "world");
assert_pending!(service.poll_ready());
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
assert_pending!(handle.poll_request());
// Send a second request
let response = service.call("two");
pin_mut!(response);
time::advance(Duration::from_millis(101)).await;
assert_request_eq!(handle, "two").send_response("done");
assert_ready_ok!(task.enter(|cx| response.poll(cx)), "done");
});
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
fn new_service(rate: Rate) -> (RateLimit<Mock>, Handle) {
let (service, handle) = mock::pair();
let service = RateLimit::new(service, rate);
(service, handle)
}
fn from_millis(n: u64) -> Duration {
Duration::from_millis(n)
assert_ready_ok!(service.poll_ready());
let response = service.call("two");
assert_request_eq!(handle, "two").send_response("done");
assert_eq!(response.await.unwrap(), "done");
}

View File

@ -24,6 +24,10 @@ edition = "2018"
[dependencies]
futures-util = "0.3"
tokio = { version = "0.2", features = ["sync"]}
tower-layer = { version = "0.3", path = "../tower-layer" }
tokio-test = "0.2"
tower-service = { version = "0.3", path = "../tower-service" }
pin-project = "0.4"
[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }

View File

@ -7,25 +7,20 @@
/// # Examples
///
/// ```rust
/// #[macro_use]
/// extern crate tower_test;
/// extern crate tower_service;
///
/// use tower_service::Service;
/// use tower_test::mock;
/// use std::task::{Poll, Context};
/// use tower_test::{mock, assert_request_eq};
/// use tokio_test::assert_ready;
/// use futures_util::pin_mut;
///
/// # fn main() {
/// mock::task_fn(|cx, mock, handle|{
/// assert_ready!(mock.poll_ready(cx));
/// # async fn test() {
/// let (mut service, mut handle) = mock::spawn();
///
/// let _response = mock.call("hello");
/// assert_ready!(service.poll_ready());
///
/// assert_request_eq!(handle, "hello")
/// .send_response("world");
/// });
/// let response = service.call("hello");
///
/// assert_request_eq!(handle, "hello").send_response("world");
///
/// assert_eq!(response.await.unwrap(), "world");
/// # }
/// ```
#[macro_export]
@ -34,7 +29,7 @@ macro_rules! assert_request_eq {
assert_request_eq!($mock_handle, $expect,)
};
($mock_handle:expr, $expect:expr, $($arg:tt)*) => {{
let (actual, send_response) = match $mock_handle.as_mut().next_request() {
let (actual, send_response) = match $mock_handle.next_request().await {
Some(r) => r,
None => panic!("expected a request but none was received."),
};

View File

@ -2,35 +2,39 @@
pub mod error;
pub mod future;
pub mod spawn;
use crate::mock::{error::Error, future::ResponseFuture};
use crate::mock::{error::Error, future::ResponseFuture, spawn::Spawn};
use core::task::Waker;
use tokio::sync::{mpsc, oneshot};
use tower_layer::Layer;
use tower_service::Service;
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
u64,
};
/// Mock a task and Service.
pub fn task_fn<T, U, F>(mut f: F)
/// Spawn a layer onto a mock service.
pub fn spawn_layer<T, U, L>(layer: L) -> (Spawn<L::Service>, Handle<T, U>)
where
F: FnMut(&mut Context, &mut Pin<&mut Mock<T, U>>, &mut Pin<&mut Handle<T, U>>),
L: Layer<Mock<T, U>>,
{
tokio_test::task::spawn(()).enter(|cx, _| {
let (mock, handle) = pair();
let (inner, handle) = pair();
let svc = layer.layer(inner);
futures_util::pin_mut!(mock);
futures_util::pin_mut!(handle);
(Spawn::new(svc), handle)
}
f(cx, &mut mock, &mut handle)
})
/// Spawn a Service onto a mock task.
pub fn spawn<T, U>() -> (Spawn<Mock<T, U>>, Handle<T, U>) {
let (svc, handle) = pair();
(Spawn::new(svc), handle)
}
/// A mock service
@ -212,21 +216,13 @@ impl<T, U> Drop for Mock<T, U> {
impl<T, U> Handle<T, U> {
/// Asynchronously gets the next request
pub fn poll_request(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Request<T, U>>> {
Box::pin(self.rx.recv()).as_mut().poll(cx)
pub fn poll_request(&mut self) -> Poll<Option<Request<T, U>>> {
tokio_test::task::spawn(()).enter(|cx, _| Box::pin(self.rx.recv()).as_mut().poll(cx))
}
/// Synchronously gets the next request.
///
/// This function blocks the current thread until a request is received.
pub fn next_request(mut self: Pin<&mut Self>) -> Option<Request<T, U>> {
use futures_util::future::poll_fn;
use tokio_test::block_on;
block_on(poll_fn(|cx| self.as_mut().poll_request(cx)))
/// Gets the next request.
pub async fn next_request(&mut self) -> Option<Request<T, U>> {
self.rx.recv().await
}
/// Allow a certain number of requests

View File

@ -0,0 +1,72 @@
//! Spawn mock services onto a mock task.
use std::task::Poll;
use tokio_test::task;
use tower_service::Service;
/// Service spawned on a mock task
#[derive(Debug)]
pub struct Spawn<T> {
inner: T,
task: task::Spawn<()>,
}
impl<T> Spawn<T> {
/// Create a new spawn.
pub fn new(inner: T) -> Self {
Self {
inner,
task: task::spawn(()),
}
}
/// Check if this service has been woken up.
pub fn is_woken(&self) -> bool {
self.task.is_woken()
}
/// Get how many futurs are holding onto the waker.
pub fn waker_ref_count(&self) -> usize {
self.task.waker_ref_count()
}
/// Poll this service ready.
pub fn poll_ready<Request>(&mut self) -> Poll<Result<(), T::Error>>
where
T: Service<Request>,
{
let task = &mut self.task;
let inner = &mut self.inner;
task.enter(|cx, _| inner.poll_ready(cx))
}
/// Call the inner Service.
pub fn call<Request>(&mut self, req: Request) -> T::Future
where
T: Service<Request>,
{
self.inner.call(req)
}
/// Get the inner service.
pub fn into_inner(self) -> T {
self.inner
}
/// Get a reference to the inner service.
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Get a mutable reference to the inner service.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T: Clone> Clone for Spawn<T> {
fn clone(&self) -> Self {
Spawn::new(self.inner.clone())
}
}

View File

@ -1,44 +1,29 @@
use futures_util::pin_mut;
use std::future::Future;
use tokio_test::{assert_pending, assert_ready};
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn single_request_ready() {
mock::task_fn::<String, String, _>(|cx, mock, handle| {
// No pending requests
assert!(handle.as_mut().poll_request(cx).is_pending());
#[tokio::test]
async fn single_request_ready() {
let (mut service, mut handle) = mock::spawn();
// Issue a request
assert_ready!(mock.poll_ready(cx)).unwrap();
assert_pending!(handle.poll_request());
let response = mock.call("hello?".into());
pin_mut!(response);
assert_ready!(service.poll_ready()).unwrap();
// Get the request from the handle
let send_response = assert_request_eq!(handle, "hello?");
let response = service.call("hello");
// Response is not ready
assert_pending!(response.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_response("world");
// Send the response
send_response.send_response("yes?".into());
assert_eq!(tokio_test::block_on(response).unwrap().as_str(), "yes?");
});
assert_eq!(response.await.unwrap(), "world");
}
#[test]
#[tokio::test]
#[should_panic]
fn backpressure() {
mock::task_fn::<String, String, _>(|cx, mock, handle| {
handle.allow(0);
async fn backpressure() {
let (mut service, mut handle) = mock::spawn::<_, ()>();
// Make sure the mock cannot accept more requests
assert_pending!(mock.poll_ready(cx));
handle.allow(0);
// Try to send a request
mock.call("hello?".into());
});
assert_pending!(service.poll_ready());
service.call("hello").await.unwrap();
}