Update tower-limit to std::future (#324)

This bumps tower-limit to 0.3.0-alpha.1
This commit is contained in:
Jon Gjengset 2019-09-09 12:09:41 -04:00 committed by GitHub
parent 390e124525
commit 154bd69b9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 167 additions and 199 deletions

View File

@ -8,7 +8,7 @@ members = [
# "tower-filter",
# "tower-hedge",
"tower-layer",
# "tower-limit",
"tower-limit",
"tower-load",
"tower-load-shed",
# "tower-reconnect",

View File

@ -1,3 +1,7 @@
# 0.3.0-alpha.1
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -8,13 +8,13 @@ name = "tower-limit"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-limit/0.1.0"
documentation = "https://docs.rs/tower-limit/0.3.0-alpha.1"
description = """
Limit maximum request rate to a `Service`.
"""
@ -22,13 +22,14 @@ categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures = "0.1.26"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tokio-sync = "0.1.3"
tokio-timer = "0.2.6"
futures-core-preview = "0.3.0-alpha.18"
tower-service = "0.3.0-alpha.1"
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
tokio-sync = "0.2.0-alpha.4"
tokio-timer = "0.3.0-alpha.4"
pin-project = "0.4.0-alpha.9"
[dev-dependencies]
tower-test = { version = "0.1", path = "../tower-test" }
tokio = "0.1.19"
tokio-mock-task = "0.1.1"
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "0.2.0-alpha.4"
futures-util-preview = "0.3.0-alpha.18"

View File

@ -1,13 +1,21 @@
//! Future types
//!
use super::Error;
use futures::{Future, Poll};
use futures_core::ready;
use pin_project::{pin_project, pinned_drop};
use std::sync::Arc;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_sync::semaphore::Semaphore;
/// Future for the `ConcurrencyLimit` service.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
semaphore: Arc<Semaphore>,
}
@ -18,21 +26,19 @@ impl<T> ResponseFuture<T> {
}
}
impl<T> Future for ResponseFuture<T>
impl<F, T, E> Future for ResponseFuture<F>
where
T: Future,
T::Error: Into<Error>,
F: Future<Output = Result<T, E>>,
E: Into<Error>,
{
type Item = T::Item;
type Error = Error;
type Output = Result<T, Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(Into::into)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.project().inner.poll(cx)).map_err(Into::into))
}
}
impl<T> Drop for ResponseFuture<T> {
fn drop(&mut self) {
self.semaphore.add_permits(1);
}
#[pinned_drop]
fn drop_response_future<T>(mut rfut: Pin<&mut ResponseFuture<T>>) {
rfut.project().semaphore.add_permits(1);
}

View File

@ -2,8 +2,9 @@ use super::{future::ResponseFuture, Error};
use tower_service::Service;
use futures::{try_ready, Poll};
use futures_core::ready;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_sync::semaphore::{self, Semaphore};
/// Enforces a limit on the concurrent number of requests the underlying
@ -57,14 +58,10 @@ where
type Error = Error;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
try_ready!(self
.limit
.permit
.poll_acquire(&self.limit.semaphore)
.map_err(Error::from));
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.limit.permit.poll_acquire(cx, &self.limit.semaphore))?;
self.inner.poll_ready().map_err(Into::into)
Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into))
}
fn call(&mut self, request: Request) -> Self::Future {

View File

@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-limit/0.1.0")]
#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0-alpha.1")]
#![cfg_attr(test, deny(warnings))]
#![deny(missing_debug_implementations, missing_docs, rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]

View File

@ -1,11 +1,19 @@
//! Future types
use super::error::Error;
use futures::{Future, Poll};
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future for the `RateLimit` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
}
@ -15,15 +23,14 @@ impl<T> ResponseFuture<T> {
}
}
impl<T> Future for ResponseFuture<T>
impl<F, T, E> Future for ResponseFuture<F>
where
T: Future,
Error: From<T::Error>,
F: Future<Output = Result<T, E>>,
Error: From<E>,
{
type Item = T::Item;
type Error = Error;
type Output = Result<T, Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(Into::into)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(self.project().inner.poll(cx))?))
}
}

View File

@ -1,5 +1,10 @@
use super::{error::Error, future::ResponseFuture, Rate};
use futures::{try_ready, Future, Poll};
use futures_core::ready;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_timer::{clock, Delay};
use tower_service::Service;
@ -61,11 +66,13 @@ where
type Error = Error;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.state {
State::Ready { .. } => return self.inner.poll_ready().map_err(Into::into),
State::Ready { .. } => {
return Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Error::from))
}
State::Limited(ref mut sleep) => {
try_ready!(sleep.poll());
ready!(Pin::new(sleep).poll(cx));
}
}
@ -74,7 +81,7 @@ where
rem: self.rate.num(),
};
self.inner.poll_ready().map_err(Into::into)
Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Error::from))
}
fn call(&mut self, request: Request) -> Self::Future {
@ -95,7 +102,7 @@ where
self.state = State::Ready { until, rem };
} else {
// The service is disabled until further notice
let sleep = Delay::new(until);
let sleep = tokio_timer::delay(until);
self.state = State::Limited(sleep);
}

View File

@ -1,134 +1,106 @@
use futures::{
self,
future::{poll_fn, Future},
};
use tokio_mock_task::MockTask;
use futures_util::{future::poll_fn, pin_mut};
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, block_on, task::MockTask};
use tower_limit::concurrency::ConcurrencyLimit;
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
macro_rules! assert_ready {
($e:expr) => {{
use futures::Async::*;
match $e {
Ok(Ready(v)) => v,
Ok(NotReady) => panic!("not ready"),
Err(e) => panic!("err = {:?}", e),
}
}};
}
macro_rules! assert_not_ready {
($e:expr) => {{
use futures::Async::*;
match $e {
Ok(NotReady) => {}
r => panic!("unexpected poll status = {:?}", r),
}
}};
}
#[test]
fn basic_service_limit_functionality_with_poll_ready() {
let mut task = MockTask::new();
let (mut service, mut handle) = new_service(2);
let (mut service, handle) = new_service(2);
pin_mut!(handle);
poll_fn(|| service.poll_ready()).wait().unwrap();
block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap();
let r1 = service.call("hello 1");
poll_fn(|| service.poll_ready()).wait().unwrap();
block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap();
let r2 = service.call("hello 2");
task.enter(|| {
assert!(service.poll_ready().unwrap().is_not_ready());
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert!(!task.is_notified());
assert!(!task.is_woken());
// The request gets passed through
assert_request_eq!(handle, "hello 1").send_response("world 1");
assert_request_eq!(handle.as_mut(), "hello 1").send_response("world 1");
// The next request gets passed through
assert_request_eq!(handle, "hello 2").send_response("world 2");
assert_request_eq!(handle.as_mut(), "hello 2").send_response("world 2");
// There are no more requests
task.enter(|| {
assert!(handle.poll_request().unwrap().is_not_ready());
task.enter(|cx| {
assert_pending!(handle.as_mut().poll_request(cx));
});
assert_eq!(r1.wait().unwrap(), "world 1");
assert!(task.is_notified());
assert_eq!(block_on(r1).unwrap(), "world 1");
assert!(task.is_woken());
// Another request can be sent
task.enter(|| {
assert!(service.poll_ready().unwrap().is_ready());
task.enter(|cx| {
assert_ready_ok!(service.poll_ready(cx));
});
let r3 = service.call("hello 3");
task.enter(|| {
assert!(service.poll_ready().unwrap().is_not_ready());
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert_eq!(r2.wait().unwrap(), "world 2");
assert_eq!(block_on(r2).unwrap(), "world 2");
// The request gets passed through
assert_request_eq!(handle, "hello 3").send_response("world 3");
assert_request_eq!(handle.as_mut(), "hello 3").send_response("world 3");
assert_eq!(r3.wait().unwrap(), "world 3");
assert_eq!(block_on(r3).unwrap(), "world 3");
}
#[test]
fn basic_service_limit_functionality_without_poll_ready() {
let mut task = MockTask::new();
let (mut service, mut handle) = new_service(2);
let (mut service, handle) = new_service(2);
pin_mut!(handle);
assert_ready!(service.poll_ready());
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let r1 = service.call("hello 1");
assert_ready!(service.poll_ready());
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let r2 = service.call("hello 2");
task.enter(|| {
assert_not_ready!(service.poll_ready());
});
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
// The request gets passed through
assert_request_eq!(handle, "hello 1").send_response("world 1");
assert_request_eq!(handle.as_mut(), "hello 1").send_response("world 1");
assert!(!task.is_notified());
assert!(!task.is_woken());
// The next request gets passed through
assert_request_eq!(handle, "hello 2").send_response("world 2");
assert_request_eq!(handle.as_mut(), "hello 2").send_response("world 2");
assert!(!task.is_notified());
assert!(!task.is_woken());
// There are no more requests
task.enter(|| {
assert!(handle.poll_request().unwrap().is_not_ready());
});
assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx)));
assert_eq!(r1.wait().unwrap(), "world 1");
assert_eq!(block_on(r1).unwrap(), "world 1");
assert!(task.is_notified());
assert!(task.is_woken());
// One more request can be sent
assert_ready!(service.poll_ready());
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let r4 = service.call("hello 4");
task.enter(|| {
assert_not_ready!(service.poll_ready());
});
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_eq!(r2.wait().unwrap(), "world 2");
assert!(task.is_notified());
assert_eq!(block_on(r2).unwrap(), "world 2");
assert!(task.is_woken());
// The request gets passed through
assert_request_eq!(handle, "hello 4").send_response("world 4");
assert_eq!(r4.wait().unwrap(), "world 4");
assert_eq!(block_on(r4).unwrap(), "world 4");
}
#[test]
@ -137,8 +109,8 @@ fn request_without_capacity() {
let (mut service, _) = new_service(0);
task.enter(|| {
assert_not_ready!(service.poll_ready());
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
}
@ -146,18 +118,19 @@ fn request_without_capacity() {
fn reserve_capacity_without_sending_request() {
let mut task = MockTask::new();
let (mut s1, mut handle) = new_service(1);
let (mut s1, handle) = new_service(1);
pin_mut!(handle);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|| {
assert!(s1.poll_ready().unwrap().is_ready());
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
// Service 2 cannot get capacity
task.enter(|| {
assert!(s2.poll_ready().unwrap().is_not_ready());
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
// s1 sends the request, then s2 is able to get capacity
@ -165,14 +138,14 @@ fn reserve_capacity_without_sending_request() {
assert_request_eq!(handle, "hello").send_response("world");
task.enter(|| {
assert!(s2.poll_ready().unwrap().is_not_ready());
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
r1.wait().unwrap();
block_on(r1).unwrap();
task.enter(|| {
assert!(s2.poll_ready().unwrap().is_ready());
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
}
@ -185,30 +158,31 @@ fn service_drop_frees_capacity() {
let mut s2 = s1.clone();
// Reserve capacity in s1
assert_ready!(s1.poll_ready());
assert_ready_ok!(task.enter(|cx| s1.poll_ready(cx)));
// Service 2 cannot get capacity
task.enter(|| {
assert_not_ready!(s2.poll_ready());
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
drop(s1);
assert!(task.is_notified());
assert_ready!(s2.poll_ready());
assert!(task.is_woken());
assert_ready_ok!(task.enter(|cx| s2.poll_ready(cx)));
}
#[test]
fn response_error_releases_capacity() {
let mut task = MockTask::new();
let (mut s1, mut handle) = new_service(1);
let (mut s1, handle) = new_service(1);
pin_mut!(handle);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|| {
assert_ready!(s1.poll_ready());
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
// s1 sends the request, then s2 is able to get capacity
@ -216,10 +190,10 @@ fn response_error_releases_capacity() {
assert_request_eq!(handle, "hello").send_error("boom");
r1.wait().unwrap_err();
block_on(r1).unwrap_err();
task.enter(|| {
assert!(s2.poll_ready().unwrap().is_ready());
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
}
@ -232,21 +206,21 @@ fn response_future_drop_releases_capacity() {
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|| {
assert_ready!(s1.poll_ready());
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
task.enter(|| {
assert_not_ready!(s2.poll_ready());
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
drop(r1);
task.enter(|| {
assert!(s2.poll_ready().unwrap().is_ready());
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
}
@ -261,20 +235,20 @@ fn multi_waiters() {
let mut s3 = s1.clone();
// Reserve capacity in s1
task1.enter(|| assert_ready!(s1.poll_ready()));
task1.enter(|cx| assert_ready_ok!(s1.poll_ready(cx)));
// s2 and s3 are not ready
task2.enter(|| assert_not_ready!(s2.poll_ready()));
task3.enter(|| assert_not_ready!(s3.poll_ready()));
task2.enter(|cx| assert_pending!(s2.poll_ready(cx)));
task3.enter(|cx| assert_pending!(s3.poll_ready(cx)));
drop(s1);
assert!(task2.is_notified());
assert!(!task3.is_notified());
assert!(task2.is_woken());
assert!(!task3.is_woken());
drop(s2);
assert!(task3.is_notified());
assert!(task3.is_woken());
}
type Mock = mock::Mock<&'static str, &'static str>;

View File

@ -1,69 +1,41 @@
use futures::future;
use tokio::runtime::current_thread::Runtime;
use tokio_timer::Delay;
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task::MockTask};
use tower_limit::rate::*;
use tower_service::*;
use tower_test::{assert_request_eq, mock};
use std::time::{Duration, Instant};
macro_rules! assert_ready {
($e:expr) => {{
use futures::Async::*;
match $e {
Ok(Ready(v)) => v,
Ok(NotReady) => panic!("not ready"),
Err(e) => panic!("err = {:?}", e),
}
}};
}
macro_rules! assert_not_ready {
($e:expr) => {{
use futures::Async::*;
match $e {
Ok(NotReady) => {}
r => panic!("unexpected poll status = {:?}", r),
}
}};
}
use std::future::Future;
use std::time::Duration;
#[test]
fn reaching_capacity() {
let mut rt = Runtime::new().unwrap();
let (mut service, mut handle) = new_service(Rate::new(1, from_millis(100)));
clock::mock(|time| {
let mut task = MockTask::new();
assert_ready!(service.poll_ready());
let response = service.call("hello");
let (mut service, handle) = new_service(Rate::new(1, from_millis(100)));
pin_mut!(handle);
assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let response = service.call("hello");
pin_mut!(response);
let response = rt.block_on(response);
assert_eq!(response.unwrap(), "world");
assert_request_eq!(handle.as_mut(), "hello").send_response("world");
assert_ready_ok!(task.enter(|cx| response.poll(cx)), "world");
rt.block_on(future::lazy(|| {
assert_not_ready!(service.poll_ready());
Ok::<_, ()>(())
}))
.unwrap();
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx)));
let poll_request = rt.block_on(future::lazy(|| handle.poll_request()));
assert!(poll_request.unwrap().is_not_ready());
time.advance(Duration::from_millis(100));
// Unlike `thread::sleep`, this advances the timer.
rt.block_on(Delay::new(Instant::now() + Duration::from_millis(100)))
.unwrap();
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let poll_ready = rt.block_on(future::lazy(|| service.poll_ready()));
assert_ready!(poll_ready);
// Send a second request
let response = service.call("two");
pin_mut!(response);
// Send a second request
let response = service.call("two");
assert_request_eq!(handle, "two").send_response("done");
let response = rt.block_on(response);
assert_eq!(response.unwrap(), "done");
assert_request_eq!(handle.as_mut(), "two").send_response("done");
assert_ready_ok!(task.enter(|cx| response.poll(cx)), "done");
});
}
type Mock = mock::Mock<&'static str, &'static str>;