208 lines
5.2 KiB
Rust
208 lines
5.2 KiB
Rust
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
|
|
use tower_limit::concurrency::ConcurrencyLimitLayer;
|
|
use tower_test::{assert_request_eq, mock};
|
|
|
|
#[tokio::test]
|
|
async fn basic_service_limit_functionality_with_poll_ready() {
|
|
let limit = ConcurrencyLimitLayer::new(2);
|
|
let (mut service, mut handle) = mock::spawn_layer(limit);
|
|
|
|
assert_ready_ok!(service.poll_ready());
|
|
let r1 = service.call("hello 1");
|
|
|
|
assert_ready_ok!(service.poll_ready());
|
|
let r2 = service.call("hello 2");
|
|
|
|
assert_pending!(service.poll_ready());
|
|
|
|
assert!(!service.is_woken());
|
|
|
|
// The request gets passed through
|
|
assert_request_eq!(handle, "hello 1").send_response("world 1");
|
|
|
|
// The next request gets passed through
|
|
assert_request_eq!(handle, "hello 2").send_response("world 2");
|
|
|
|
// There are no more requests
|
|
assert_pending!(handle.poll_request());
|
|
|
|
assert_eq!(r1.await.unwrap(), "world 1");
|
|
|
|
assert!(service.is_woken());
|
|
|
|
// Another request can be sent
|
|
assert_ready_ok!(service.poll_ready());
|
|
|
|
let r3 = service.call("hello 3");
|
|
|
|
assert_pending!(service.poll_ready());
|
|
|
|
assert_eq!(r2.await.unwrap(), "world 2");
|
|
|
|
// The request gets passed through
|
|
assert_request_eq!(handle, "hello 3").send_response("world 3");
|
|
|
|
assert_eq!(r3.await.unwrap(), "world 3");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn basic_service_limit_functionality_without_poll_ready() {
|
|
let limit = ConcurrencyLimitLayer::new(2);
|
|
let (mut service, mut handle) = mock::spawn_layer(limit);
|
|
|
|
assert_ready_ok!(service.poll_ready());
|
|
let r1 = service.call("hello 1");
|
|
|
|
assert_ready_ok!(service.poll_ready());
|
|
let r2 = service.call("hello 2");
|
|
|
|
assert_pending!(service.poll_ready());
|
|
|
|
// The request gets passed through
|
|
assert_request_eq!(handle, "hello 1").send_response("world 1");
|
|
|
|
assert!(!service.is_woken());
|
|
|
|
// The next request gets passed through
|
|
assert_request_eq!(handle, "hello 2").send_response("world 2");
|
|
|
|
assert!(!service.is_woken());
|
|
|
|
// There are no more requests
|
|
assert_pending!(handle.poll_request());
|
|
|
|
assert_eq!(r1.await.unwrap(), "world 1");
|
|
|
|
assert!(service.is_woken());
|
|
|
|
// One more request can be sent
|
|
assert_ready_ok!(service.poll_ready());
|
|
let r4 = service.call("hello 4");
|
|
|
|
assert_pending!(service.poll_ready());
|
|
|
|
assert_eq!(r2.await.unwrap(), "world 2");
|
|
assert!(service.is_woken());
|
|
|
|
// The request gets passed through
|
|
assert_request_eq!(handle, "hello 4").send_response("world 4");
|
|
|
|
assert_eq!(r4.await.unwrap(), "world 4");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn request_without_capacity() {
|
|
let limit = ConcurrencyLimitLayer::new(0);
|
|
let (mut service, _) = mock::spawn_layer::<(), (), _>(limit);
|
|
|
|
assert_pending!(service.poll_ready());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn reserve_capacity_without_sending_request() {
|
|
let limit = ConcurrencyLimitLayer::new(1);
|
|
let (mut s1, mut handle) = mock::spawn_layer(limit);
|
|
|
|
let mut s2 = s1.clone();
|
|
|
|
// Reserve capacity in s1
|
|
assert_ready_ok!(s1.poll_ready());
|
|
|
|
// Service 2 cannot get capacity
|
|
assert_pending!(s2.poll_ready());
|
|
|
|
// s1 sends the request, then s2 is able to get capacity
|
|
let r1 = s1.call("hello");
|
|
|
|
assert_request_eq!(handle, "hello").send_response("world");
|
|
|
|
assert_pending!(s2.poll_ready());
|
|
|
|
r1.await.unwrap();
|
|
|
|
assert_ready_ok!(s2.poll_ready());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn service_drop_frees_capacity() {
|
|
let limit = ConcurrencyLimitLayer::new(1);
|
|
let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
|
|
|
|
let mut s2 = s1.clone();
|
|
|
|
// Reserve capacity in s1
|
|
assert_ready_ok!(s1.poll_ready());
|
|
|
|
// Service 2 cannot get capacity
|
|
assert_pending!(s2.poll_ready());
|
|
|
|
drop(s1);
|
|
|
|
assert!(s2.is_woken());
|
|
assert_ready_ok!(s2.poll_ready());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn response_error_releases_capacity() {
|
|
let limit = ConcurrencyLimitLayer::new(1);
|
|
let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit);
|
|
|
|
let mut s2 = s1.clone();
|
|
|
|
// Reserve capacity in s1
|
|
assert_ready_ok!(s1.poll_ready());
|
|
|
|
// s1 sends the request, then s2 is able to get capacity
|
|
let r1 = s1.call("hello");
|
|
|
|
assert_request_eq!(handle, "hello").send_error("boom");
|
|
|
|
r1.await.unwrap_err();
|
|
|
|
assert_ready_ok!(s2.poll_ready());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn response_future_drop_releases_capacity() {
|
|
let limit = ConcurrencyLimitLayer::new(1);
|
|
let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit);
|
|
|
|
let mut s2 = s1.clone();
|
|
|
|
// Reserve capacity in s1
|
|
assert_ready_ok!(s1.poll_ready());
|
|
|
|
// s1 sends the request, then s2 is able to get capacity
|
|
let r1 = s1.call("hello");
|
|
|
|
assert_pending!(s2.poll_ready());
|
|
|
|
drop(r1);
|
|
|
|
assert_ready_ok!(s2.poll_ready());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn multi_waiters() {
|
|
let limit = ConcurrencyLimitLayer::new(1);
|
|
let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
|
|
let mut s2 = s1.clone();
|
|
let mut s3 = s1.clone();
|
|
|
|
// Reserve capacity in s1
|
|
assert_ready_ok!(s1.poll_ready());
|
|
|
|
// s2 and s3 are not ready
|
|
assert_pending!(s2.poll_ready());
|
|
assert_pending!(s3.poll_ready());
|
|
|
|
drop(s1);
|
|
|
|
assert!(s2.is_woken());
|
|
assert!(!s3.is_woken());
|
|
|
|
drop(s2);
|
|
|
|
assert!(s3.is_woken());
|
|
}
|