balance: Only balance over ready endpoints (#293)

In 03ec4aa, the balancer was changed to make a quick endpoint decision.
This, however, means that the balancer can return NotReady when it does
in fact have a ready endpoint.

This changes the balancer to separate unready endpoints, only
performing p2c over ready endpoints. Unready endpoints are tracked with
a FuturesUnordered that supports eviction via oneshots.

The main downside of this change is that the Balancer must become
generic over the Request type.
This commit is contained in:
Oliver Gould 2019-07-05 20:46:33 -07:00 committed by GitHub
parent 67a11f27ff
commit 18b30eb70e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 267 additions and 183 deletions

View File

@ -27,6 +27,7 @@ futures = "0.1.26"
indexmap = "1.0.2" indexmap = "1.0.2"
log = "0.4.1" log = "0.4.1"
rand = "0.6.5" rand = "0.6.5"
tokio-sync = "0.1.3"
tokio-timer = "0.2.4" tokio-timer = "0.2.4"
tower-discover = "0.1.0" tower-discover = "0.1.0"
tower-layer = "0.1.0" tower-layer = "0.1.0"
@ -41,7 +42,7 @@ hdrhistogram = "6.0"
quickcheck = { version = "0.6", default-features = false } quickcheck = { version = "0.6", default-features = false }
tokio = "0.1.7" tokio = "0.1.7"
tokio-executor = "0.1.2" tokio-executor = "0.1.2"
tower = { version = "0.1", path = "../tower" } tower = { version = "*", path = "../tower" }
tower-buffer = { version = "0.1", path = "../tower-buffer" } tower-buffer = { version = "*", path = "../tower-buffer" }
tower-limit = { version = "0.1", path = "../tower-limit" } tower-limit = { version = "*", path = "../tower-limit" }
tower-test = { version = "0.1.0", path = "../tower-test" } tower-test = { version = "*", path = "../tower-test" }

View File

@ -131,12 +131,13 @@ fn gen_disco() -> impl Discover<
) )
} }
fn run<D>(name: &'static str, lb: lb::p2c::Balance<D>) -> impl Future<Item = (), Error = ()> fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>) -> impl Future<Item = (), Error = ()>
where where
D: Discover + Send + 'static, D: Discover + Send + 'static,
D::Error: Into<Error>, D::Error: Into<Error>,
D::Key: Send, D::Key: Clone + Send,
D::Service: Service<Req, Response = Rsp, Error = Error> + load::Load + Send, D::Service: Service<Req, Response = Rsp> + load::Load + Send,
<D::Service as Service<Req>>::Error: Into<Error>,
<D::Service as Service<Req>>::Future: Send, <D::Service as Service<Req>>::Future: Send,
<D::Service as load::Load>::Metric: std::fmt::Debug, <D::Service as load::Load>::Metric: std::fmt::Debug,
{ {

View File

@ -5,12 +5,12 @@ use tower_layer::Layer;
/// Efficiently distributes requests across an arbitrary number of services /// Efficiently distributes requests across an arbitrary number of services
#[derive(Clone)] #[derive(Clone)]
pub struct BalanceLayer<D> { pub struct BalanceLayer<D, Req> {
rng: SmallRng, rng: SmallRng,
_marker: PhantomData<fn(D)>, _marker: PhantomData<fn(D, Req)>,
} }
impl<D> BalanceLayer<D> { impl<D, Req> BalanceLayer<D, Req> {
/// Builds a balancer using the system entropy. /// Builds a balancer using the system entropy.
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -31,15 +31,15 @@ impl<D> BalanceLayer<D> {
} }
} }
impl<S> Layer<S> for BalanceLayer<S> { impl<S, Req> Layer<S> for BalanceLayer<S, Req> {
type Service = BalanceMake<S>; type Service = BalanceMake<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service { fn layer(&self, make_discover: S) -> Self::Service {
BalanceMake::new(make_discover, self.rng.clone()) BalanceMake::new(make_discover, self.rng.clone())
} }
} }
impl<D> fmt::Debug for BalanceLayer<D> { impl<D, Req> fmt::Debug for BalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BalanceLayer") f.debug_struct("BalanceLayer")
.field("rng", &self.rng) .field("rng", &self.rng)

View File

@ -1,25 +1,32 @@
use super::Balance; use super::Balance;
use futures::{try_ready, Future, Poll}; use futures::{try_ready, Future, Poll};
use rand::{rngs::SmallRng, FromEntropy}; use rand::{rngs::SmallRng, FromEntropy};
use std::marker::PhantomData;
use tower_discover::Discover; use tower_discover::Discover;
use tower_service::Service; use tower_service::Service;
/// Makes `Balancer`s given an inner service that makes `Discover`s. /// Makes `Balancer`s given an inner service that makes `Discover`s.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct BalanceMake<S> { pub struct BalanceMake<S, Req> {
inner: S, inner: S,
rng: SmallRng, rng: SmallRng,
_marker: PhantomData<fn(Req)>,
} }
/// Makes a balancer instance. /// Makes a balancer instance.
pub struct MakeFuture<F> { pub struct MakeFuture<F, Req> {
inner: F, inner: F,
rng: SmallRng, rng: SmallRng,
_marker: PhantomData<fn(Req)>,
} }
impl<S> BalanceMake<S> { impl<S, Req> BalanceMake<S, Req> {
pub(crate) fn new(inner: S, rng: SmallRng) -> Self { pub(crate) fn new(inner: S, rng: SmallRng) -> Self {
Self { inner, rng } Self {
inner,
rng,
_marker: PhantomData,
}
} }
/// Initializes a P2C load balancer from the OS's entropy source. /// Initializes a P2C load balancer from the OS's entropy source.
@ -28,14 +35,15 @@ impl<S> BalanceMake<S> {
} }
} }
impl<S, Target> Service<Target> for BalanceMake<S> impl<S, Target, Req> Service<Target> for BalanceMake<S, Req>
where where
S: Service<Target>, S: Service<Target>,
S::Response: Discover, S::Response: Discover,
<S::Response as Discover>::Service: Service<Req>,
{ {
type Response = Balance<S::Response>; type Response = Balance<S::Response, Req>;
type Error = S::Error; type Error = S::Error;
type Future = MakeFuture<S::Future>; type Future = MakeFuture<S::Future, Req>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready() self.inner.poll_ready()
@ -45,16 +53,18 @@ where
MakeFuture { MakeFuture {
inner: self.inner.call(target), inner: self.inner.call(target),
rng: self.rng.clone(), rng: self.rng.clone(),
_marker: PhantomData,
} }
} }
} }
impl<F> Future for MakeFuture<F> impl<F, Req> Future for MakeFuture<F, Req>
where where
F: Future, F: Future,
F::Item: Discover, F::Item: Discover,
<F::Item as Discover>::Service: Service<Req>,
{ {
type Item = Balance<F::Item>; type Item = Balance<F::Item, Req>;
type Error = F::Error; type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

View File

@ -1,11 +1,13 @@
use crate::error; use crate::error;
use futures::{future, try_ready, Async, Future, Poll}; use futures::{future, stream, try_ready, Async, Future, Poll, Stream};
use indexmap::IndexMap; use indexmap::IndexMap;
use log::{debug, info, trace}; use log::{debug, trace};
use rand::{rngs::SmallRng, FromEntropy}; use rand::{rngs::SmallRng, FromEntropy};
use tokio_sync::oneshot;
use tower_discover::{Change, Discover}; use tower_discover::{Change, Discover};
use tower_load::Load; use tower_load::Load;
use tower_service::Service; use tower_service::Service;
use tower_util::Ready;
/// Distributes requests across inner services using the [Power of Two Choices][p2c]. /// Distributes requests across inner services using the [Power of Two Choices][p2c].
/// ///
@ -21,26 +23,50 @@ use tower_service::Service;
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded /// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf /// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
#[derive(Debug)] #[derive(Debug)]
pub struct Balance<D: Discover> { pub struct Balance<D: Discover, Req> {
discover: D, discover: D,
endpoints: IndexMap<D::Key, D::Service>, ready_services: IndexMap<D::Key, D::Service>,
unready_services: stream::FuturesUnordered<UnreadyService<D::Key, D::Service, Req>>,
cancelations: IndexMap<D::Key, oneshot::Sender<()>>,
/// Holds an index into `endpoints`, indicating the service that has been /// Holds an index into `endpoints`, indicating the service that has been
/// chosen to dispatch the next request. /// chosen to dispatch the next request.
ready_index: Option<usize>, next_ready_index: Option<usize>,
rng: SmallRng, rng: SmallRng,
} }
impl<D: Discover> Balance<D> { /// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is removed from discovery.
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
cancel: oneshot::Receiver<()>,
ready: tower_util::Ready<S, Req>,
}
enum Error<E> {
Inner(E),
Canceled,
}
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Service: Service<Req>,
{
/// Initializes a P2C load balancer from the provided randomization source. /// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self { pub fn new(discover: D, rng: SmallRng) -> Self {
Self { Self {
rng, rng,
discover, discover,
endpoints: IndexMap::default(), ready_services: IndexMap::default(),
ready_index: None, cancelations: IndexMap::default(),
unready_services: stream::FuturesUnordered::new(),
next_ready_index: None,
} }
} }
@ -51,33 +77,81 @@ impl<D: Discover> Balance<D> {
/// Returns the number of endpoints currently tracked by the balancer. /// Returns the number of endpoints currently tracked by the balancer.
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.endpoints.len() self.ready_services.len() + self.unready_services.len()
} }
// XXX `pool::Pool` requires direct access to this... Not ideal. // XXX `pool::Pool` requires direct access to this... Not ideal.
pub(crate) fn discover_mut(&mut self) -> &mut D { pub(crate) fn discover_mut(&mut self) -> &mut D {
&mut self.discover &mut self.discover
} }
}
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<error::Error>,
{
/// Polls `discover` for updates, adding new items to `not_ready`. /// Polls `discover` for updates, adding new items to `not_ready`.
/// ///
/// Removals may alter the order of either `ready` or `not_ready`. /// Removals may alter the order of either `ready` or `not_ready`.
fn poll_discover(&mut self) -> Poll<(), error::Discover> fn poll_discover(&mut self) -> Poll<(), error::Discover> {
where
D::Error: Into<error::Error>,
{
debug!("updating from discover"); debug!("updating from discover");
loop { loop {
match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) {
Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)), Change::Remove(key) => {
Change::Remove(rm_key) => { trace!("remove");
// Update the ready index to account for reordering of endpoints. self.evict(&key)
if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { }
self.ready_index = self Change::Insert(key, svc) => {
.ready_index trace!("insert");
.and_then(|i| Self::repair_index(i, rm_idx, self.endpoints.len())); self.evict(&key);
} self.push_unready(key, svc);
}
}
}
}
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
let (tx, rx) = oneshot::channel();
self.cancelations.insert(key.clone(), tx);
self.unready_services.push(UnreadyService {
key: Some(key),
ready: Ready::new(svc),
cancel: rx,
});
}
fn evict(&mut self, key: &D::Key) {
// Update the ready index to account for reordering of ready.
if let Some((idx, _, _)) = self.ready_services.swap_remove_full(key) {
self.next_ready_index = self
.next_ready_index
.and_then(|i| Self::repair_index(i, idx, self.ready_services.len()));
debug_assert!(!self.cancelations.contains_key(key));
} else if let Some(cancel) = self.cancelations.remove(key) {
let _ = cancel.send(());
}
}
fn poll_unready(&mut self) {
loop {
match self.unready_services.poll() {
Ok(Async::NotReady) | Ok(Async::Ready(None)) => return,
Ok(Async::Ready(Some((key, svc)))) => {
trace!("endpoint ready");
let _cancel = self.cancelations.remove(&key);
debug_assert!(_cancel.is_some(), "missing cancelation");
self.ready_services.insert(key, svc);
}
Err((key, Error::Canceled)) => debug_assert!(!self.cancelations.contains_key(&key)),
Err((key, Error::Inner(e))) => {
debug!("dropping failed endpoint: {:?}", e.into());
let _cancel = self.cancelations.swap_remove(&key);
debug_assert!(_cancel.is_some());
} }
} }
} }
@ -106,120 +180,86 @@ impl<D: Discover> Balance<D> {
} }
/// Performs P2C on inner services to find a suitable endpoint. /// Performs P2C on inner services to find a suitable endpoint.
/// fn p2c_next_ready_index(&mut self) -> Option<usize> {
/// When this function returns NotReady, `ready_index` is unset. When this match self.ready_services.len() {
/// function returns Ready, `ready_index` is set with an index into 0 => None,
/// `endpoints` of a ready endpoint service. 1 => Some(0),
///
/// If `endpoints` is reordered due to removals, `ready_index` is updated via
/// `repair_index()`.
fn poll_ready_index<Svc, Request>(&mut self) -> Poll<usize, Svc::Error>
where
D: Discover<Service = Svc>,
Svc: Service<Request> + Load,
Svc::Error: Into<error::Error>,
Svc::Metric: std::fmt::Debug,
{
match self.endpoints.len() {
0 => Ok(Async::NotReady),
1 => {
// If there's only one endpoint, ignore its load but require that it
// is ready.
match self.poll_endpoint_index_load(0) {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
self.ready_index = Some(0);
Ok(Async::Ready(0))
}
Err(e) => {
info!("evicting failed endpoint: {}", e.into());
let _ = self.endpoints.swap_remove_index(0);
Ok(Async::NotReady)
}
}
}
len => { len => {
// Get two distinct random indexes (in a random order). Poll the // Get two distinct random indexes (in a random order) and
// service at each index. // compare the loads of the service at each index.
//
// If either fails, the service is removed from the set of
// endpoints.
let idxs = rand::seq::index::sample(&mut self.rng, len, 2); let idxs = rand::seq::index::sample(&mut self.rng, len, 2);
let aidx = idxs.index(0); let aidx = idxs.index(0);
let bidx = idxs.index(1); let bidx = idxs.index(1);
debug_assert_ne!(aidx, bidx, "random indices must be distinct"); debug_assert_ne!(aidx, bidx, "random indices must be distinct");
let (aload, bidx) = match self.poll_endpoint_index_load(aidx) { let aload = self.ready_index_load(aidx);
Ok(ready) => (ready, bidx), let bload = self.ready_index_load(bidx);
Err(e) => { let ready = if aload <= bload { aidx } else { bidx };
info!("evicting failed endpoint: {}", e.into());
let _ = self.endpoints.swap_remove_index(aidx);
let new_bidx = Self::repair_index(bidx, aidx, self.endpoints.len())
.expect("random indices must be distinct");
(Async::NotReady, new_bidx)
}
};
let (bload, aidx) = match self.poll_endpoint_index_load(bidx) { trace!(
Ok(ready) => (ready, aidx), "load[{}]={:?}; load[{}]={:?} -> ready={}",
Err(e) => { aidx,
info!("evicting failed endpoint: {}", e.into()); aload,
let _ = self.endpoints.swap_remove_index(bidx); bidx,
let new_aidx = Self::repair_index(aidx, bidx, self.endpoints.len()) bload,
.expect("random indices must be distinct"); ready
(Async::NotReady, new_aidx) );
} Some(ready)
};
trace!("load[{}]={:?}; load[{}]={:?}", aidx, aload, bidx, bload);
let ready = match (aload, bload) {
(Async::Ready(aload), Async::Ready(bload)) => {
if aload <= bload {
Async::Ready(aidx)
} else {
Async::Ready(bidx)
}
}
(Async::Ready(_), Async::NotReady) => Async::Ready(aidx),
(Async::NotReady, Async::Ready(_)) => Async::Ready(bidx),
(Async::NotReady, Async::NotReady) => Async::NotReady,
};
trace!(" -> ready={:?}", ready);
Ok(ready)
} }
} }
} }
/// Accesses an endpoint by index and, if it is ready, returns its current load. /// Accesses a ready endpoint by index and returns its current load.
fn poll_endpoint_index_load<Svc, Request>( fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
&mut self, let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
index: usize, svc.load()
) -> Poll<Svc::Metric, Svc::Error> }
where
D: Discover<Service = Svc>, fn poll_ready_index_or_evict(&mut self, index: usize) -> Poll<(), ()> {
Svc: Service<Request> + Load, let (_, svc) = self
Svc::Error: Into<error::Error>, .ready_services
{ .get_index_mut(index)
let (_, svc) = self.endpoints.get_index_mut(index).expect("invalid index"); .expect("invalid index");
try_ready!(svc.poll_ready());
Ok(Async::Ready(svc.load())) match svc.poll_ready() {
Ok(Async::Ready(())) => Ok(Async::Ready(())),
Ok(Async::NotReady) => {
// became unready; so move it back there.
let (key, svc) = self
.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
self.push_unready(key, svc);
Ok(Async::NotReady)
}
Err(e) => {
// failed, so drop it.
debug!("evicting failed endpoint: {:?}", e.into());
self.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
Err(())
}
}
} }
} }
impl<D, Svc, Request> Service<Request> for Balance<D> impl<D, Req> Service<Req> for Balance<D, Req>
where where
D: Discover<Service = Svc>, D: Discover,
D::Key: Clone,
D::Error: Into<error::Error>, D::Error: Into<error::Error>,
Svc: Service<Request> + Load, D::Service: Service<Req> + Load,
Svc::Error: Into<error::Error>, <D::Service as Load>::Metric: std::fmt::Debug,
Svc::Metric: std::fmt::Debug, <D::Service as Service<Req>>::Error: Into<error::Error>,
{ {
type Response = <D::Service as Service<Request>>::Response; type Response = <D::Service as Service<Req>>::Response;
type Error = error::Error; type Error = error::Error;
type Future = type Future = future::MapErr<
future::MapErr<<D::Service as Service<Request>>::Future, fn(Svc::Error) -> error::Error>; <D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> error::Error,
>;
/// Prepares the balancer to process a request. /// Prepares the balancer to process a request.
/// ///
@ -230,45 +270,74 @@ where
// previously-selected `ready_index` if appropriate. // previously-selected `ready_index` if appropriate.
self.poll_discover()?; self.poll_discover()?;
if let Some(index) = self.ready_index { // Drive new or busy services to readiness.
debug_assert!(index < self.endpoints.len()); self.poll_unready();
// Ensure the selected endpoint is still ready. trace!(
match self.poll_endpoint_index_load(index) { "ready={}; unready={}",
Ok(Async::Ready(_)) => return Ok(Async::Ready(())), self.ready_services.len(),
Ok(Async::NotReady) => {} self.unready_services.len()
Err(e) => { );
drop(self.endpoints.swap_remove_index(index));
info!("evicting failed endpoint: {}", e.into()); loop {
// If a node has already been selected, ensure that it is ready.
// This ensures that the underlying service is ready immediately
// before a request is dispatched to it. If, e.g., a failure
// detector has changed the state of the service, it may be evicted
// from the ready set so that P2C can be performed again.
if let Some(index) = self.next_ready_index {
trace!("preselected ready_index={}", index);
debug_assert!(index < self.ready_services.len());
if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) {
return Ok(Async::Ready(()));
} }
self.next_ready_index = None;
} }
self.ready_index = None; self.next_ready_index = self.p2c_next_ready_index();
} if self.next_ready_index.is_none() {
debug_assert!(self.ready_services.is_empty());
let tries = match self.endpoints.len() { return Ok(Async::NotReady);
0 => return Ok(Async::NotReady),
1 => 1,
n => n / 2,
};
for _ in 0..tries {
if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? {
trace!("ready: {:?}", idx);
self.ready_index = Some(idx);
return Ok(Async::Ready(()));
} }
} }
trace!("exhausted {} attempts", tries);
Ok(Async::NotReady)
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Req) -> Self::Future {
let index = self.ready_index.take().expect("not ready"); let index = self.next_ready_index.take().expect("not ready");
let (_, svc) = self let (key, mut svc) = self
.endpoints .ready_services
.get_index_mut(index) .swap_remove_index(index)
.expect("invalid ready index"); .expect("invalid ready index");
// no need to repair since the ready_index has been cleared.
svc.call(request).map_err(Into::into) let fut = svc.call(request);
self.push_unready(key, svc);
fut.map_err(Into::into)
}
}
impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
type Item = (K, S);
type Error = (K, Error<S::Error>);
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(())) = self.cancel.poll() {
let key = self.key.take().expect("polled after ready");
return Err((key, Error::Canceled));
}
match self.ready.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(svc)) => {
let key = self.key.take().expect("polled after ready");
Ok((key, svc).into())
}
Err(e) => {
let key = self.key.take().expect("polled after ready");
Err((key, Error::Inner(e)))
}
}
} }
} }

View File

@ -29,7 +29,9 @@ fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![]; let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty); let disco = ServiceList::new(empty);
let mut svc = Balance::from_entropy(disco); let mut svc = Balance::from_entropy(disco);
assert_not_ready!(svc); with_task(|| {
assert_not_ready!(svc);
})
} }
#[test] #[test]
@ -64,7 +66,7 @@ fn single_endpoint() {
} }
#[test] #[test]
fn two_endpoints_with_equal_weight() { fn two_endpoints_with_equal_load() {
let (mock_a, mut handle_a) = mock::pair(); let (mock_a, mut handle_a) = mock::pair();
let (mock_b, mut handle_b) = mock::pair(); let (mock_b, mut handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1); let mock_a = load::Constant::new(mock_a, 1);
@ -101,20 +103,21 @@ fn two_endpoints_with_equal_weight() {
handle_a.allow(1); handle_a.allow(1);
handle_b.allow(1); handle_b.allow(1);
assert_ready!(svc, "must be ready when both endpoints are ready"); for _ in 0..2 {
{ assert_ready!(svc, "must be ready when both endpoints are ready");
let fut = svc.call(()); let fut = svc.call(());
for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] { for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() { if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() {
log::info!("using {}", c);
tx.send_response(c); tx.send_response(c);
h.allow(0);
} }
} }
fut.wait().expect("call must complete"); fut.wait().expect("call must complete");
} }
handle_a.send_error("endpoint lost"); handle_a.send_error("endpoint lost");
handle_b.allow(1); assert_not_ready!(svc, "must be not be ready");
assert_ready!(svc, "must be ready after one endpoint is removed");
assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",); assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",);
}); });
} }

View File

@ -238,7 +238,7 @@ where
MS::Error: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static,
Target: Clone, Target: Clone,
{ {
balance: Balance<PoolDiscoverer<MS, Target, Request>>, balance: Balance<PoolDiscoverer<MS, Target, Request>, Request>,
options: Builder, options: Builder,
ewma: f64, ewma: f64,
} }
@ -263,18 +263,18 @@ where
} }
} }
impl<MS, Target, Request> Service<Request> for Pool<MS, Target, Request> impl<MS, Target, Req> Service<Req> for Pool<MS, Target, Req>
where where
MS: MakeService<Target, Request>, MS: MakeService<Target, Req>,
MS::Service: Load, MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug, <MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static, MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static,
Target: Clone, Target: Clone,
{ {
type Response = <Balance<PoolDiscoverer<MS, Target, Request>> as Service<Request>>::Response; type Response = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
type Error = <Balance<PoolDiscoverer<MS, Target, Request>> as Service<Request>>::Error; type Error = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error;
type Future = <Balance<PoolDiscoverer<MS, Target, Request>> as Service<Request>>::Future; type Future = <Balance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if let Async::Ready(()) = self.balance.poll_ready()? { if let Async::Ready(()) = self.balance.poll_ready()? {
@ -318,7 +318,7 @@ where
Ok(Async::NotReady) Ok(Async::NotReady)
} }
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, req: Req) -> Self::Future {
self.balance.call(req) self.balance.call(req)
} }
} }