balance: Add a stress test for p2c

The hope for this was to reproduce #415 (which it does not sadly), but
at least it adds a test for p2c!
This commit is contained in:
Jon Gjengset 2020-02-20 12:59:45 -05:00
parent 47c3a14560
commit 650e5be58e
2 changed files with 168 additions and 1 deletions

View File

@ -45,7 +45,7 @@ slab = "0.4"
tracing-subscriber = "0.1.1"
hdrhistogram = "6.0"
quickcheck = { version = "0.6", default-features = false }
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "0.2", features = ["macros", "stream"] }
tokio-test = "0.2"
tower-buffer = { version = "0.3", path = "../tower-buffer" }
tower-limit = { version = "0.3", path = "../tower-limit" }

167
tower-balance/tests/p2c.rs Normal file
View File

@ -0,0 +1,167 @@
use std::future::Future;
use std::task::{Context, Poll};
use tokio_test::{assert_pending, assert_ready, task};
use tower_balance::p2c::Balance;
use tower_discover::{Change, ServiceStream};
use tower_service::Service;
use tower_test::mock;
type Req = &'static str;
struct Mock(mock::Mock<Req, Req>);
impl Service<Req> for Mock {
type Response = <mock::Mock<Req, Req> as Service<Req>>::Response;
type Error = <mock::Mock<Req, Req> as Service<Req>>::Error;
type Future = <mock::Mock<Req, Req> as Service<Req>>::Future;
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
self.0.call(req)
}
}
impl tower_load::Load for Mock {
type Metric = usize;
fn load(&self) -> Self::Metric {
rand::random()
}
}
#[test]
fn stress() {
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
let mut cache = Balance::<_, Req>::from_entropy(ServiceStream::new(rx));
let mut nready = 0;
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();
let mut retired = Vec::<mock::Handle<Req, Req>>::new();
for _ in 0..100_000 {
for _ in 0..(rand::random::<u8>() % 8) {
if !services.is_empty() && rand::random() {
if nready == 0 || rand::random::<u8>() > u8::max_value() / 4 {
// ready a service
// TODO: sometimes ready a removed service?
for (_, (handle, ready)) in &mut services {
if !*ready {
handle.allow(1);
*ready = true;
nready += 1;
break;
}
}
} else {
// use a service
use std::task::Poll;
match task.enter(|cx, _| cache.poll_ready(cx)) {
Poll::Ready(Ok(())) => {
// assert_ne!(nready, 0, "got ready when no service is ready");
let mut fut = cache.call("hello");
let mut fut = std::pin::Pin::new(&mut fut);
assert_pending!(task.enter(|cx, _| fut.as_mut().poll(cx)));
let mut found = false;
for (_, (handle, ready)) in &mut services {
if *ready {
if let Poll::Ready(Some((req, res))) = handle.poll_request() {
assert_eq!(req, "hello");
res.send_response("world");
*ready = false;
nready -= 1;
found = true;
break;
}
}
}
if !found {
// we must have been given a retired service
let mut at = None;
for (i, handle) in retired.iter_mut().enumerate() {
if let Poll::Ready(Some((req, res))) = handle.poll_request() {
assert_eq!(req, "hello");
res.send_response("world");
at = Some(i);
break;
}
}
let _ = retired.swap_remove(
at.expect("request was not sent to a ready service"),
);
nready -= 1;
}
assert_ready!(task.enter(|cx, _| fut.as_mut().poll(cx))).unwrap();
}
Poll::Ready(_) => unreachable!("discover stream has not failed"),
Poll::Pending => {
// assert_eq!(nready, 0, "got pending when a service is ready");
}
}
}
} else if services.is_empty() || rand::random() {
if services.is_empty() || nready == 0 || rand::random() {
// add
let (svc, mut handle) = mock::pair::<Req, Req>();
let svc = Mock(svc);
handle.allow(0);
let k = services.insert((handle, false));
let ok = tx.send(Ok(Change::Insert(k, svc)));
assert!(ok.is_ok());
} else {
// remove
while !services.is_empty() {
let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
if services.contains(k) {
let (handle, ready) = services.remove(k);
if ready {
retired.push(handle);
}
let ok = tx.send(Ok(Change::Remove(k)));
assert!(ok.is_ok());
break;
}
}
}
} else {
// fail a service
while !services.is_empty() {
let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
if services.contains(k) {
let (mut handle, ready) = services.remove(k);
if ready {
nready -= 1;
}
handle.send_error("doom");
break;
}
}
}
}
let r = task.enter(|cx, _| cache.poll_ready(cx));
// drop any retired services that the p2c has gotten rid of
let mut removed = Vec::new();
for (i, handle) in retired.iter_mut().enumerate() {
if let Poll::Ready(None) = handle.poll_request() {
removed.push(i);
}
}
for i in removed.into_iter().rev() {
retired.swap_remove(i);
nready -= 1;
}
use std::task::Poll;
match r {
Poll::Ready(Ok(())) => {
assert_ne!(nready, 0, "got ready when no service is ready");
}
Poll::Ready(_) => unreachable!("discover stream has not failed"),
Poll::Pending => {
assert_eq!(nready, 0, "got pending when a service is ready");
}
}
// assert_eq!(cache.len(), services.len());
}
}