From 650e5be58eb424ae3f26fb48fd862fdf0f36d87c Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Thu, 20 Feb 2020 12:59:45 -0500 Subject: [PATCH] balance: Add a stress test for p2c The hope for this was to reproduce #415 (which it does not sadly), but at least it adds a test for p2c! --- tower-balance/Cargo.toml | 2 +- tower-balance/tests/p2c.rs | 167 +++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 tower-balance/tests/p2c.rs diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index d4c1966..ea41c9f 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -45,7 +45,7 @@ slab = "0.4" tracing-subscriber = "0.1.1" hdrhistogram = "6.0" quickcheck = { version = "0.6", default-features = false } -tokio = { version = "0.2", features = ["macros"] } +tokio = { version = "0.2", features = ["macros", "stream"] } tokio-test = "0.2" tower-buffer = { version = "0.3", path = "../tower-buffer" } tower-limit = { version = "0.3", path = "../tower-limit" } diff --git a/tower-balance/tests/p2c.rs b/tower-balance/tests/p2c.rs new file mode 100644 index 0000000..e083af1 --- /dev/null +++ b/tower-balance/tests/p2c.rs @@ -0,0 +1,167 @@ +use std::future::Future; +use std::task::{Context, Poll}; +use tokio_test::{assert_pending, assert_ready, task}; +use tower_balance::p2c::Balance; +use tower_discover::{Change, ServiceStream}; +use tower_service::Service; +use tower_test::mock; + +type Req = &'static str; +struct Mock(mock::Mock); + +impl Service for Mock { + type Response = as Service>::Response; + type Error = as Service>::Error; + type Future = as Service>::Future; + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + self.0.poll_ready(cx) + } + fn call(&mut self, req: Req) -> Self::Future { + self.0.call(req) + } +} + +impl tower_load::Load for Mock { + type Metric = usize; + fn load(&self) -> Self::Metric { + rand::random() + } +} + +#[test] +fn stress() { + let mut task = task::spawn(()); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); + let mut cache = Balance::<_, Req>::from_entropy(ServiceStream::new(rx)); + + let mut nready = 0; + let mut services = slab::Slab::<(mock::Handle, bool)>::new(); + let mut retired = Vec::>::new(); + for _ in 0..100_000 { + for _ in 0..(rand::random::() % 8) { + if !services.is_empty() && rand::random() { + if nready == 0 || rand::random::() > u8::max_value() / 4 { + // ready a service + // TODO: sometimes ready a removed service? + for (_, (handle, ready)) in &mut services { + if !*ready { + handle.allow(1); + *ready = true; + nready += 1; + break; + } + } + } else { + // use a service + use std::task::Poll; + match task.enter(|cx, _| cache.poll_ready(cx)) { + Poll::Ready(Ok(())) => { + // assert_ne!(nready, 0, "got ready when no service is ready"); + let mut fut = cache.call("hello"); + let mut fut = std::pin::Pin::new(&mut fut); + assert_pending!(task.enter(|cx, _| fut.as_mut().poll(cx))); + let mut found = false; + for (_, (handle, ready)) in &mut services { + if *ready { + if let Poll::Ready(Some((req, res))) = handle.poll_request() { + assert_eq!(req, "hello"); + res.send_response("world"); + *ready = false; + nready -= 1; + found = true; + break; + } + } + } + if !found { + // we must have been given a retired service + let mut at = None; + for (i, handle) in retired.iter_mut().enumerate() { + if let Poll::Ready(Some((req, res))) = handle.poll_request() { + assert_eq!(req, "hello"); + res.send_response("world"); + at = Some(i); + break; + } + } + let _ = retired.swap_remove( + at.expect("request was not sent to a ready service"), + ); + nready -= 1; + } + assert_ready!(task.enter(|cx, _| fut.as_mut().poll(cx))).unwrap(); + } + Poll::Ready(_) => unreachable!("discover stream has not failed"), + Poll::Pending => { + // assert_eq!(nready, 0, "got pending when a service is ready"); + } + } + } + } else if services.is_empty() || rand::random() { + if services.is_empty() || nready == 0 || rand::random() { + // add + let (svc, mut handle) = mock::pair::(); + let svc = Mock(svc); + handle.allow(0); + let k = services.insert((handle, false)); + let ok = tx.send(Ok(Change::Insert(k, svc))); + assert!(ok.is_ok()); + } else { + // remove + while !services.is_empty() { + let k = rand::random::() % (services.iter().last().unwrap().0 + 1); + if services.contains(k) { + let (handle, ready) = services.remove(k); + if ready { + retired.push(handle); + } + let ok = tx.send(Ok(Change::Remove(k))); + assert!(ok.is_ok()); + break; + } + } + } + } else { + // fail a service + while !services.is_empty() { + let k = rand::random::() % (services.iter().last().unwrap().0 + 1); + if services.contains(k) { + let (mut handle, ready) = services.remove(k); + if ready { + nready -= 1; + } + handle.send_error("doom"); + break; + } + } + } + } + + let r = task.enter(|cx, _| cache.poll_ready(cx)); + + // drop any retired services that the p2c has gotten rid of + let mut removed = Vec::new(); + for (i, handle) in retired.iter_mut().enumerate() { + if let Poll::Ready(None) = handle.poll_request() { + removed.push(i); + } + } + for i in removed.into_iter().rev() { + retired.swap_remove(i); + nready -= 1; + } + + use std::task::Poll; + match r { + Poll::Ready(Ok(())) => { + assert_ne!(nready, 0, "got ready when no service is ready"); + } + Poll::Ready(_) => unreachable!("discover stream has not failed"), + Poll::Pending => { + assert_eq!(nready, 0, "got pending when a service is ready"); + } + } + + // assert_eq!(cache.len(), services.len()); + } +}