Notify Pool when Services are dropped (#301)

Prior to this change, when `Balance` dropped a failing service, `Pool`
would not be notified of this fact. This meant that it never updated
`.services`, and so it might not add a new backing `Service` (e.g., due
to `max_services`) even though no working backing exist.

With this change, dropped services notify the `Pool` so that it knows to
re-check its limits. It also gains some much-needed tests.
This commit is contained in:
Jon Gjengset 2019-07-15 13:50:01 -04:00 committed by GitHub
parent 491dfbe634
commit 40fbb85c4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 380 additions and 27 deletions

View File

@ -38,6 +38,7 @@ tower-layer = "0.1.0"
tower-load = { version = "0.1.0", path = "../tower-load" }
tower-service = "0.2.0"
tower-util = "0.1.0"
slab = "0.4"
[dev-dependencies]
tracing-fmt = { git = "https://github.com/tokio-rs/tracing.git" }

View File

@ -15,12 +15,17 @@
#![deny(missing_docs)]
use super::p2c::Balance;
use futures::{try_ready, Async, Future, Poll};
use crate::error;
use futures::{try_ready, Async, Future, Poll, Stream};
use slab::Slab;
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_service::Service;
use tower_util::MakeService;
#[cfg(test)]
mod test;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum Level {
/// Load is low -- remove a service instance.
@ -41,24 +46,38 @@ where
making: Option<MS::Future>,
target: Target,
load: Level,
services: usize,
services: Slab<()>,
died_tx: tokio_sync::mpsc::UnboundedSender<usize>,
died_rx: tokio_sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
}
impl<MS, Target, Request> Discover for PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
// NOTE: these bounds should go away once MakeService adopts Box<dyn Error>
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
type Key = usize;
type Service = MS::Service;
type Service = DropNotifyService<MS::Service>;
type Error = MS::MakeError;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
if self.services == 0 && self.making.is_none() {
while let Async::Ready(Some(sid)) = self
.died_rx
.poll()
.expect("cannot be closed as we hold tx too")
{
self.services.remove(sid);
tracing::trace!(
pool.services = self.services.len(),
message = "removing dropped service"
);
}
if self.services.len() == 0 && self.making.is_none() {
let _ = try_ready!(self.maker.poll_ready());
tracing::trace!("construct initial pool connection");
self.making = Some(self.maker.make_service(self.target.clone()));
}
@ -67,14 +86,14 @@ where
if self.making.is_none() {
if self
.limit
.map(|limit| self.services >= limit)
.map(|limit| self.services.len() >= limit)
.unwrap_or(false)
{
return Ok(Async::NotReady);
}
tracing::trace!(
pool.services = self.services,
pool.services = self.services.len(),
message = "decided to add service to loaded pool"
);
try_ready!(self.maker.poll_ready());
@ -85,14 +104,19 @@ where
}
if let Some(mut fut) = self.making.take() {
if let Async::Ready(s) = fut.poll()? {
self.services += 1;
if let Async::Ready(svc) = fut.poll()? {
let id = self.services.insert(());
let svc = DropNotifyService {
svc,
id,
notify: self.died_tx.clone(),
};
tracing::trace!(
pool.services = self.services,
pool.services = self.services.len(),
message = "finished creating new service"
);
self.load = Level::Normal;
return Ok(Async::Ready(Change::Insert(self.services, s)));
return Ok(Async::Ready(Change::Insert(id, svc)));
} else {
self.making = Some(fut);
return Ok(Async::NotReady);
@ -104,13 +128,15 @@ where
unreachable!("found high load but no Service being made");
}
Level::Normal => Ok(Async::NotReady),
Level::Low if self.services == 1 => Ok(Async::NotReady),
Level::Low if self.services.len() == 1 => Ok(Async::NotReady),
Level::Low => {
self.load = Level::Normal;
let rm = self.services;
self.services -= 1;
// NOTE: this is a little sad -- we'd prefer to kill short-living services
let rm = self.services.iter().next().unwrap().0;
// note that we _don't_ remove from self.services here
// that'll happen automatically on drop
tracing::trace!(
pool.services = self.services,
pool.services = self.services.len(),
message = "removing service for over-provisioned pool"
);
Ok(Async::Ready(Change::Remove(rm)))
@ -224,16 +250,19 @@ impl Builder {
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
let (died_tx, died_rx) = tokio_sync::mpsc::unbounded_channel();
let d = PoolDiscoverer {
maker: make_service,
making: None,
target,
load: Level::Normal,
services: 0,
services: Slab::new(),
died_tx,
died_rx,
limit: self.limit,
};
@ -249,8 +278,8 @@ impl Builder {
pub struct Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
balance: Balance<PoolDiscoverer<MS, Target, Request>, Request>,
@ -263,8 +292,8 @@ where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
/// Construct a new dynamically sized `Pool`.
@ -283,8 +312,8 @@ where
MS: MakeService<Target, Req>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
type Response = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
@ -304,7 +333,7 @@ where
}
discover.load = Level::Low;
if discover.services > 1 {
if discover.services.len() > 1 {
// reset EWMA so we don't immediately try to remove another service
self.ewma = self.options.init;
}
@ -334,6 +363,10 @@ where
// `Ready`, so we won't try to launch another service immediately.
// we clamp it to high though in case the # of services is limited.
self.ewma = self.options.high;
// we need to call balance again for PoolDiscover to realize
// it can make a new service
return self.balance.poll_ready();
} else {
discover.load = Level::Normal;
}
@ -346,3 +379,37 @@ where
self.balance.call(req)
}
}
#[doc(hidden)]
pub struct DropNotifyService<Svc> {
svc: Svc,
id: usize,
notify: tokio_sync::mpsc::UnboundedSender<usize>,
}
impl<Svc> Drop for DropNotifyService<Svc> {
fn drop(&mut self) {
let _ = self.notify.try_send(self.id).is_ok();
}
}
impl<Svc: Load> Load for DropNotifyService<Svc> {
type Metric = Svc::Metric;
fn load(&self) -> Self::Metric {
self.svc.load()
}
}
impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc> {
type Response = Svc::Response;
type Future = Svc::Future;
type Error = Svc::Error;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.svc.poll_ready()
}
fn call(&mut self, req: Request) -> Self::Future {
self.svc.call(req)
}
}

View File

@ -0,0 +1,285 @@
use futures::{Async, Future};
use tower_load as load;
use tower_service::Service;
use tower_test::mock;
use super::*;
macro_rules! assert_fut_ready {
($fut:expr, $val:expr) => {{
assert_fut_ready!($fut, $val, "must be ready");
}};
($fut:expr, $val:expr, $msg:expr) => {{
assert_eq!(
$fut.poll().expect("must not fail"),
Async::Ready($val),
$msg
);
}};
}
macro_rules! assert_ready {
($svc:expr) => {{
assert_ready!($svc, "must be ready");
}};
($svc:expr, $msg:expr) => {{
assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
macro_rules! assert_fut_not_ready {
($fut:expr) => {{
assert_fut_not_ready!($fut, "must not be ready");
}};
($fut:expr, $msg:expr) => {{
assert!(!$fut.poll().expect("must not fail").is_ready(), $msg);
}};
}
macro_rules! assert_not_ready {
($svc:expr) => {{
assert_not_ready!($svc, "must not be ready");
}};
($svc:expr, $msg:expr) => {{
assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
#[test]
fn basic() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new().build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
// send a request to the one backing service
let mut fut = pool.call(());
with_task(|| {
assert_fut_not_ready!(fut);
});
svc1.next_request().unwrap().1.send_response("foobar");
with_task(|| {
assert_fut_ready!(fut, "foobar");
});
}
#[test]
fn high_load() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new()
.urgency(1.0) // so _any_ NotReady will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.max_services(Some(2))
.build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
svc1.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
// make the one backing service not ready
let mut fut1 = pool.call(());
// if we poll_ready again, pool should notice that load is increasing
// since urgency == 1.0, it should immediately enter high load
with_task(|| {
assert_not_ready!(pool);
});
// it should ask the maker for another service, so we give it one
let (svc2_m, mut svc2) = mock::pair();
svc2.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again for one more request
with_task(|| {
assert_ready!(pool);
});
let mut fut2 = pool.call(());
with_task(|| {
assert_not_ready!(pool);
});
// the pool should _not_ try to add another service
// sicen we have max_services(2)
with_task(|| {
assert!(!handle.poll_request().unwrap().is_ready());
});
// let see that each service got one request
svc1.next_request().unwrap().1.send_response("foo");
svc2.next_request().unwrap().1.send_response("bar");
with_task(|| {
assert_fut_ready!(fut1, "foo");
});
with_task(|| {
assert_fut_ready!(fut2, "bar");
});
}
#[test]
fn low_load() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new()
.urgency(1.0) // so any event will change the service count
.build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
svc1.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
// cycling a request should now work
let mut fut = pool.call(());
svc1.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
// and pool should now not be ready (since svc1 isn't ready)
// it should immediately try to add another service
// which we give it
with_task(|| {
assert_not_ready!(pool);
});
let (svc2_m, mut svc2) = mock::pair();
svc2.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc2_m, 0));
// pool is now ready
// which (because of urgency == 1.0) should immediately cause it to drop a service
// it'll drop svc1, so it'll still be ready
with_task(|| {
assert_ready!(pool);
});
// and even with another ready, it won't drop svc2 since its now the only service
with_task(|| {
assert_ready!(pool);
});
// cycling a request should now work on svc2
let mut fut = pool.call(());
svc2.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
// and again (still svc2)
svc2.allow(1);
with_task(|| {
assert_ready!(pool);
});
let mut fut = pool.call(());
svc2.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
}
#[test]
fn failing_service() {
// start the pool
let (mock, mut handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
let mut pool = Builder::new()
.urgency(1.0) // so _any_ NotReady will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.build(mock, ());
with_task(|| {
assert_not_ready!(pool);
});
// give the pool a backing service
let (svc1_m, mut svc1) = mock::pair();
svc1.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc1_m, 0));
with_task(|| {
assert_ready!(pool);
});
// one request-response cycle
let mut fut = pool.call(());
svc1.next_request().unwrap().1.send_response("foo");
with_task(|| {
assert_fut_ready!(fut, "foo");
});
// now make svc1 fail, so it has to be removed
svc1.send_error("ouch");
// polling now should recognize the failed service,
// try to create a new one, and then realize the maker isn't ready
with_task(|| {
assert_not_ready!(pool);
});
// then we release another service
let (svc2_m, mut svc2) = mock::pair();
svc2.allow(1);
handle
.next_request()
.unwrap()
.1
.send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again
with_task(|| {
assert_ready!(pool);
});
// and a cycle should work (and go through svc2)
let mut fut = pool.call(());
svc2.next_request().unwrap().1.send_response("bar");
with_task(|| {
assert_fut_ready!(fut, "bar");
});
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}