balance: Specialize the balancer for P2C (#288)

As described in #286, `Balance` had a few problems:
- it is responsible for driving all inner services to readiness, making
  its `poll_ready` O(n) and not O(1);
- the `choose` abstraction was a hinderance. If a round-robin balancer
  is needed it can be implemented separately without much duplicate
  code; and
- endpoint errors were considered fatal to the balancer.

This changes replaces `Balance` with `p2c::Balance` and removes the
`choose` module.

Endpoint service failures now cause the service to be removed from the
balancer gracefully.

Endpoint selection is now effectively constant time, though it biases
for availability in the case when random selection does not yield an
available endpoint.

`tower-test` had to be updated so that a mocked service could fail after
advertising readiness.
This commit is contained in:
Oliver Gould 2019-06-04 13:59:47 -07:00 committed by GitHub
parent 313530c875
commit 03ec4aafa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 601 additions and 735 deletions

View File

@ -29,6 +29,7 @@ log = "0.4.1"
rand = "0.6.5"
tokio-timer = "0.2.4"
tower-discover = "0.1.0"
tower-layer = "0.1.0"
tower-load = { version = "0.1.0", path = "../tower-load" }
tower-service = "0.2.0"
tower-util = "0.1.0"
@ -43,3 +44,4 @@ tokio-executor = "0.1.2"
tower = { version = "0.1", path = "../tower" }
tower-buffer = { version = "0.1", path = "../tower-buffer" }
tower-limit = { version = "0.1", path = "../tower-limit" }
tower-test = { version = "0.1.0", path = "../tower-test" }

View File

@ -14,7 +14,7 @@ use tower::{
use tower_balance as lb;
use tower_load as load;
const REQUESTS: usize = 50_000;
const REQUESTS: usize = 100_000;
const CONCURRENCY: usize = 500;
const DEFAULT_RTT: Duration = Duration::from_millis(30);
static ENDPOINT_CAPACITY: usize = CONCURRENCY;
@ -55,24 +55,22 @@ fn main() {
let fut = future::lazy(move || {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::Balance::p2c(load::PeakEwmaDiscover::new(
let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
load::NoInstrument,
));
run("P2C+PeakEWMA", pe)
run("P2C+PeakEWMA...", pe)
});
let fut = fut.then(move |_| {
let d = gen_disco();
let ll = lb::Balance::p2c(load::PendingRequestsDiscover::new(d, load::NoInstrument));
run("P2C+LeastLoaded", ll)
});
let fut = fut.and_then(move |_| {
let rr = lb::Balance::round_robin(gen_disco());
run("RoundRobin", rr)
let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(
d,
load::NoInstrument,
));
run("P2C+LeastLoaded...", ll)
});
rt.spawn(fut);
@ -133,14 +131,14 @@ fn gen_disco() -> impl Discover<
)
}
fn run<D, C>(name: &'static str, lb: lb::Balance<D, C>) -> impl Future<Item = (), Error = ()>
fn run<D>(name: &'static str, lb: lb::p2c::Balance<D>) -> impl Future<Item = (), Error = ()>
where
D: Discover + Send + 'static,
D::Error: Into<Error>,
D::Key: Send,
D::Service: Service<Req, Response = Rsp, Error = Error> + Send,
D::Service: Service<Req, Response = Rsp, Error = Error> + load::Load + Send,
<D::Service as Service<Req>>::Future: Send,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
<D::Service as load::Load>::Metric: std::fmt::Debug,
{
println!("{}", name);

View File

@ -1,49 +0,0 @@
use indexmap::IndexMap;
mod p2c;
mod round_robin;
pub use self::{p2c::PowerOfTwoChoices, round_robin::RoundRobin};
/// A strategy for choosing nodes.
// TODO hide `K`
pub trait Choose<K, N> {
/// Returns the index of a replica to be used next.
///
/// `replicas` cannot be empty, so this function must always return a valid index on
/// [0, replicas.len()-1].
fn choose(&mut self, replicas: Replicas<K, N>) -> usize;
}
/// Creates a `Replicas` if there are two or more services.
///
pub(crate) fn replicas<K, S>(inner: &IndexMap<K, S>) -> Result<Replicas<K, S>, TooFew> {
if inner.len() < 2 {
return Err(TooFew);
}
Ok(Replicas(inner))
}
/// Indicates that there were not at least two services.
#[derive(Copy, Clone, Debug)]
pub struct TooFew;
/// Holds two or more services.
// TODO hide `K`
pub struct Replicas<'a, K, S>(&'a IndexMap<K, S>);
impl<K, S> Replicas<'_, K, S> {
pub fn len(&self) -> usize {
self.0.len()
}
}
impl<K, S> ::std::ops::Index<usize> for Replicas<'_, K, S> {
type Output = S;
fn index(&self, idx: usize) -> &Self::Output {
let (_, service) = self.0.get_index(idx).expect("out of bounds");
service
}
}

View File

@ -1,108 +0,0 @@
use log::trace;
use rand::{rngs::SmallRng, FromEntropy, Rng};
use crate::{
choose::{Choose, Replicas},
Load,
};
/// Chooses nodes using the [Power of Two Choices][p2c].
///
/// This is a load-aware strategy, so this may only be used to choose over services that
/// implement `Load`.
///
/// As described in the [Finagle Guide][finagle]:
/// > The algorithm randomly picks two nodes from the set of ready endpoints and selects
/// > the least loaded of the two. By repeatedly using this strategy, we can expect a
/// > manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where `n`
/// > is the number of servers in the cluster.
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
#[derive(Debug)]
pub struct PowerOfTwoChoices {
rng: SmallRng,
}
// ==== impl PowerOfTwoChoices ====
impl Default for PowerOfTwoChoices {
fn default() -> Self {
Self::new(SmallRng::from_entropy())
}
}
impl PowerOfTwoChoices {
pub fn new(rng: SmallRng) -> Self {
Self { rng }
}
/// Returns two random, distinct indices into `ready`.
fn random_pair(&mut self, len: usize) -> (usize, usize) {
debug_assert!(len >= 2);
// Choose a random number on [0, len-1].
let idx0 = self.rng.gen::<usize>() % len;
let idx1 = {
// Choose a random number on [1, len-1].
let delta = (self.rng.gen::<usize>() % (len - 1)) + 1;
// Add it to `idx0` and then mod on `len` to produce a value on
// [idx0+1, len-1] or [0, idx0-1].
(idx0 + delta) % len
};
debug_assert!(idx0 != idx1, "random pair must be distinct");
return (idx0, idx1);
}
}
impl<K, L> Choose<K, L> for PowerOfTwoChoices
where
L: Load,
L::Metric: PartialOrd + ::std::fmt::Debug,
{
/// Chooses two distinct nodes at random and compares their load.
///
/// Returns the index of the lesser-loaded node.
fn choose(&mut self, replicas: Replicas<K, L>) -> usize {
let (a, b) = self.random_pair(replicas.len());
let a_load = replicas[a].load();
let b_load = replicas[b].load();
trace!(
"choose node[{a}]={a_load:?} node[{b}]={b_load:?}",
a = a,
b = b,
a_load = a_load,
b_load = b_load
);
if a_load <= b_load {
a
} else {
b
}
}
}
#[cfg(test)]
mod tests {
use quickcheck::*;
use super::*;
quickcheck! {
fn distinct_random_pairs(n: usize) -> TestResult {
if n < 2 {
return TestResult::discard();
}
let mut p2c = PowerOfTwoChoices::default();
let (a, b) = p2c.random_pair(n);
TestResult::from_bool(a != b)
}
}
}

View File

@ -1,23 +0,0 @@
use crate::choose::{Choose, Replicas};
/// Chooses nodes sequentially.
///
/// This strategy is load-agnostic and may therefore be used to choose over any type of
/// service.
///
/// Note that ordering is not strictly enforced, especially when services are removed by
/// the balancer.
#[derive(Debug, Default)]
pub struct RoundRobin {
/// References the index of the next node to be used.
pos: usize,
}
impl<K, N> Choose<K, N> for RoundRobin {
fn choose(&mut self, nodes: Replicas<K, N>) -> usize {
let len = nodes.len();
let idx = self.pos % len;
self.pos = (idx + 1) % len;
idx
}
}

View File

@ -1,17 +1,20 @@
//! Error types
use std::fmt;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
/// An error returned when the balancer's endpoint discovery stream fails.
#[derive(Debug)]
pub struct Balance(pub(crate) Error);
pub struct Discover(pub(crate) Error);
impl fmt::Display for Balance {
impl fmt::Display for Discover {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "load balancing discover error: {}", self.0)
write!(f, "load balancer discovery error: {}", self.0)
}
}
impl std::error::Error for Balance {
impl std::error::Error for Discover {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&*self.0)
}

View File

@ -1,23 +0,0 @@
use crate::error::Error;
use futures::{Future, Poll};
pub struct ResponseFuture<F>(F);
impl<F> ResponseFuture<F> {
pub(crate) fn new(future: F) -> ResponseFuture<F> {
ResponseFuture(future)
}
}
impl<F> Future for ResponseFuture<F>
where
F: Future,
F::Error: Into<Error>,
{
type Item = F::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(Into::into)
}
}

View File

@ -1,337 +1,11 @@
//! Load balancing middlewares.
#![doc(html_root_url = "https://docs.rs/tower-balance/0.1.0")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
#[cfg(test)]
extern crate quickcheck;
#![deny(warnings)]
use futures::{Async, Poll};
use indexmap::IndexMap;
use log::{debug, trace};
use rand::{rngs::SmallRng, SeedableRng};
use std::fmt;
use tower_discover::Discover;
use tower_load::Load;
use tower_service::Service;
pub mod choose;
pub mod error;
pub mod future;
pub mod p2c;
pub mod pool;
#[cfg(test)]
mod test;
pub use self::{choose::Choose, pool::Pool};
use self::{error::Error, future::ResponseFuture};
/// Balances requests across a set of inner services.
#[derive(Debug)]
pub struct Balance<D: Discover, C> {
/// Provides endpoints from service discovery.
discover: D,
/// Determines which endpoint is ready to be used next.
choose: C,
/// Holds an index into `ready`, indicating the service that has been chosen to
/// dispatch the next request.
chosen_ready_index: Option<usize>,
/// Holds an index into `ready`, indicating the service that dispatched the last
/// request.
dispatched_ready_index: Option<usize>,
/// Holds all possibly-available endpoints (i.e. from `discover`).
ready: IndexMap<D::Key, D::Service>,
/// Newly-added endpoints that have not yet become ready.
not_ready: IndexMap<D::Key, D::Service>,
}
// ===== impl Balance =====
impl<D> Balance<D, choose::PowerOfTwoChoices>
where
D: Discover,
D::Service: Load,
<D::Service as Load>::Metric: PartialOrd + fmt::Debug,
{
/// Chooses services using the [Power of Two Choices][p2c].
///
/// This configuration is prefered when a load metric is known.
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
pub fn p2c(discover: D) -> Self {
Self::new(discover, choose::PowerOfTwoChoices::default())
}
/// Initializes a P2C load balancer from the provided randomization source.
///
/// This may be preferable when an application instantiates many balancers.
pub fn p2c_with_rng<R: rand::Rng>(discover: D, rng: &mut R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self::new(discover, choose::PowerOfTwoChoices::new(rng)))
}
}
impl<D: Discover> Balance<D, choose::RoundRobin> {
/// Attempts to choose services sequentially.
///
/// This configuration is prefered when no load metric is known.
pub fn round_robin(discover: D) -> Self {
Self::new(discover, choose::RoundRobin::default())
}
}
impl<D, C> Balance<D, C>
where
D: Discover,
C: Choose<D::Key, D::Service>,
{
/// Creates a new balancer.
pub fn new(discover: D, choose: C) -> Self {
Self {
discover,
choose,
chosen_ready_index: None,
dispatched_ready_index: None,
ready: IndexMap::default(),
not_ready: IndexMap::default(),
}
}
/// Returns true iff there are ready services.
///
/// This is not authoritative and is only useful after `poll_ready` has been called.
pub fn is_ready(&self) -> bool {
!self.ready.is_empty()
}
/// Returns true iff there are no ready services.
///
/// This is not authoritative and is only useful after `poll_ready` has been called.
pub fn is_not_ready(&self) -> bool {
self.ready.is_empty()
}
/// Counts the number of services considered to be ready.
///
/// This is not authoritative and is only useful after `poll_ready` has been called.
pub fn num_ready(&self) -> usize {
self.ready.len()
}
/// Counts the number of services not considered to be ready.
///
/// This is not authoritative and is only useful after `poll_ready` has been called.
pub fn num_not_ready(&self) -> usize {
self.not_ready.len()
}
}
impl<D, C> Balance<D, C>
where
D: Discover,
D::Error: Into<Error>,
C: Choose<D::Key, D::Service>,
{
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn update_from_discover(&mut self) -> Result<(), error::Balance> {
debug!("updating from discover");
use tower_discover::Change::*;
while let Async::Ready(change) =
self.discover.poll().map_err(|e| error::Balance(e.into()))?
{
match change {
Insert(key, svc) => {
// If the `Insert`ed service is a duplicate of a service already
// in the ready list, remove the ready service first. The new
// service will then be inserted into the not-ready list.
self.ready.remove(&key);
self.not_ready.insert(key, svc);
}
Remove(key) => {
let _ejected = match self.ready.remove(&key) {
None => self.not_ready.remove(&key),
Some(s) => Some(s),
};
// XXX is it safe to just drop the Service? Or do we need some sort of
// graceful teardown?
// TODO: poll_close
}
}
}
Ok(())
}
/// Calls `poll_ready` on all services in `not_ready`.
///
/// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted
/// into `ready`, potentially altering the order of `ready` and/or `not_ready`.
fn promote_to_ready<Request>(&mut self) -> Result<(), <D::Service as Service<Request>>::Error>
where
D::Service: Service<Request>,
{
let n = self.not_ready.len();
if n == 0 {
trace!("promoting to ready: not_ready is empty, skipping.");
return Ok(());
}
debug!("promoting to ready: {}", n);
// Iterate through the not-ready endpoints from right to left to prevent removals
// from reordering services in a way that could prevent a service from being polled.
for idx in (0..n).rev() {
let is_ready = {
let (_, svc) = self
.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
svc.poll_ready()?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
debug!("not_ready[{:?}]: promoting to ready", idx);
let (key, svc) = self
.not_ready
.swap_remove_index(idx)
.expect("invalid not_ready index");
self.ready.insert(key, svc);
} else {
debug!("not_ready[{:?}]: not promoting to ready", idx);
}
}
debug!("promoting to ready: done");
Ok(())
}
/// Polls a `ready` service or moves it to `not_ready`.
///
/// If the service exists in `ready` and does not poll as ready, it is moved to
/// `not_ready`, potentially altering the order of `ready` and/or `not_ready`.
fn poll_ready_index<Request>(
&mut self,
idx: usize,
) -> Option<Poll<(), <D::Service as Service<Request>>::Error>>
where
D::Service: Service<Request>,
{
match self.ready.get_index_mut(idx) {
None => return None,
Some((_, svc)) => match svc.poll_ready() {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(e)),
Ok(Async::NotReady) => {}
},
}
let (key, svc) = self
.ready
.swap_remove_index(idx)
.expect("invalid ready index");
self.not_ready.insert(key, svc);
Some(Ok(Async::NotReady))
}
/// Chooses the next service to which a request will be dispatched.
///
/// Ensures that .
fn choose_and_poll_ready<Request>(
&mut self,
) -> Poll<(), <D::Service as Service<Request>>::Error>
where
D::Service: Service<Request>,
{
loop {
let n = self.ready.len();
debug!("choosing from {} replicas", n);
let idx = match n {
0 => return Ok(Async::NotReady),
1 => 0,
_ => {
let replicas = choose::replicas(&self.ready).expect("too few replicas");
self.choose.choose(replicas)
}
};
// XXX Should we handle per-endpoint errors?
if self
.poll_ready_index(idx)
.expect("invalid ready index")?
.is_ready()
{
self.chosen_ready_index = Some(idx);
return Ok(Async::Ready(()));
}
}
}
}
impl<D, C, Svc, Request> Service<Request> for Balance<D, C>
where
D: Discover<Service = Svc>,
D::Error: Into<Error>,
Svc: Service<Request>,
Svc::Error: Into<Error>,
C: Choose<D::Key, Svc>,
{
type Response = Svc::Response;
type Error = Error;
type Future = ResponseFuture<Svc::Future>;
/// Prepares the balancer to process a request.
///
/// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// Clear before `ready` is altered.
self.chosen_ready_index = None;
// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx)
.expect("invalid dispatched ready key")
.map_err(Into::into)?;
}
// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready().map_err(Into::into)?;
// Choose the next service to be used by `call`.
self.choose_and_poll_ready().map_err(Into::into)
}
fn call(&mut self, request: Request) -> Self::Future {
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self
.ready
.get_index_mut(idx)
.expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);
let rsp = svc.call(request);
ResponseFuture::new(rsp)
}
}

View File

@ -0,0 +1,48 @@
use super::BalanceMake;
use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Efficiently distributes requests across an arbitrary number of services
#[derive(Clone)]
pub struct BalanceLayer<D> {
rng: SmallRng,
_marker: PhantomData<fn(D)>,
}
impl<D> BalanceLayer<D> {
/// Builds a balancer using the system entropy.
pub fn new() -> Self {
Self {
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}
/// Builds a balancer from the provided RNG.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(rng: &mut R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
_marker: PhantomData,
})
}
}
impl<S> Layer<S> for BalanceLayer<S> {
type Service = BalanceMake<S>;
fn layer(&self, make_discover: S) -> Self::Service {
BalanceMake::new(make_discover, self.rng.clone())
}
}
impl<D> fmt::Debug for BalanceLayer<D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BalanceLayer")
.field("rng", &self.rng)
.finish()
}
}

View File

@ -0,0 +1,65 @@
use super::Balance;
use futures::{try_ready, Future, Poll};
use rand::{rngs::SmallRng, FromEntropy};
use tower_discover::Discover;
use tower_service::Service;
/// Makes `Balancer`s given an inner service that makes `Discover`s.
#[derive(Clone, Debug)]
pub struct BalanceMake<S> {
inner: S,
rng: SmallRng,
}
/// Makes a balancer instance.
pub struct MakeFuture<F> {
inner: F,
rng: SmallRng,
}
impl<S> BalanceMake<S> {
pub(crate) fn new(inner: S, rng: SmallRng) -> Self {
Self { inner, rng }
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(make_discover: S) -> Self {
Self::new(make_discover, SmallRng::from_entropy())
}
}
impl<S, Target> Service<Target> for BalanceMake<S>
where
S: Service<Target>,
S::Response: Discover,
{
type Response = Balance<S::Response>;
type Error = S::Error;
type Future = MakeFuture<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, target: Target) -> Self::Future {
MakeFuture {
inner: self.inner.call(target),
rng: self.rng.clone(),
}
}
}
impl<F> Future for MakeFuture<F>
where
F: Future,
F::Item: Discover,
{
type Item = Balance<F::Item>;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = try_ready!(self.inner.poll());
let svc = Balance::new(inner, self.rng.clone());
Ok(svc.into())
}
}

View File

@ -0,0 +1,12 @@
//! A Power-of-Two-Choices Load Balancer
mod layer;
mod make;
mod service;
#[cfg(test)]
mod test;
pub use layer::BalanceLayer;
pub use make::{BalanceMake, MakeFuture};
pub use service::Balance;

View File

@ -0,0 +1,274 @@
use crate::error;
use futures::{future, try_ready, Async, Future, Poll};
use indexmap::IndexMap;
use log::{debug, info, trace};
use rand::{rngs::SmallRng, FromEntropy};
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_service::Service;
/// Distributes requests across inner services using the [Power of Two Choices][p2c].
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
#[derive(Debug)]
pub struct Balance<D: Discover> {
discover: D,
endpoints: IndexMap<D::Key, D::Service>,
/// Holds an index into `endpoints`, indicating the service that has been
/// chosen to dispatch the next request.
ready_index: Option<usize>,
rng: SmallRng,
}
impl<D: Discover> Balance<D> {
/// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self {
Self {
rng,
discover,
endpoints: IndexMap::default(),
ready_index: None,
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(discover: D) -> Self {
Self::new(discover, SmallRng::from_entropy())
}
/// Returns the number of endpoints currently tracked by the balancer.
pub fn len(&self) -> usize {
self.endpoints.len()
}
// XXX `pool::Pool` requires direct access to this... Not ideal.
pub(crate) fn discover_mut(&mut self) -> &mut D {
&mut self.discover
}
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn poll_discover(&mut self) -> Poll<(), error::Discover>
where
D::Error: Into<error::Error>,
{
debug!("updating from discover");
loop {
match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) {
Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)),
Change::Remove(rm_key) => {
// Update the ready index to account for reordering of endpoints.
if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) {
self.ready_index = self
.ready_index
.and_then(|i| Self::repair_index(i, rm_idx, self.endpoints.len()));
}
}
}
}
}
// Returns the updated index of `orig_idx` after the entry at `rm_idx` was
// swap-removed from an IndexMap with `orig_sz` items.
//
// If `orig_idx` is the same as `rm_idx`, None is returned to indicate that
// index cannot be repaired.
fn repair_index(orig_idx: usize, rm_idx: usize, new_sz: usize) -> Option<usize> {
debug_assert!(orig_idx <= new_sz && rm_idx <= new_sz);
let repaired = match orig_idx {
i if i == rm_idx => None, // removed
i if i == new_sz => Some(rm_idx), // swapped
i => Some(i), // uneffected
};
trace!(
"repair_index: orig={}; rm={}; sz={}; => {:?}",
orig_idx,
rm_idx,
new_sz,
repaired,
);
repaired
}
/// Performs P2C on inner services to find a suitable endpoint.
///
/// When this function returns NotReady, `ready_index` is unset. When this
/// function returns Ready, `ready_index` is set with an index into
/// `endpoints` of a ready endpoint service.
///
/// If `endpoints` is reordered due to removals, `ready_index` is updated via
/// `repair_index()`.
fn poll_ready_index<Svc, Request>(&mut self) -> Poll<usize, Svc::Error>
where
D: Discover<Service = Svc>,
Svc: Service<Request> + Load,
Svc::Error: Into<error::Error>,
Svc::Metric: std::fmt::Debug,
{
match self.endpoints.len() {
0 => Ok(Async::NotReady),
1 => {
// If there's only one endpoint, ignore its load but require that it
// is ready.
match self.poll_endpoint_index_load(0) {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
self.ready_index = Some(0);
Ok(Async::Ready(0))
}
Err(e) => {
info!("evicting failed endpoint: {}", e.into());
let _ = self.endpoints.swap_remove_index(0);
Ok(Async::NotReady)
}
}
}
len => {
// Get two distinct random indexes (in a random order). Poll the
// service at each index.
//
// If either fails, the service is removed from the set of
// endpoints.
let idxs = rand::seq::index::sample(&mut self.rng, len, 2);
let aidx = idxs.index(0);
let bidx = idxs.index(1);
debug_assert_ne!(aidx, bidx, "random indices must be distinct");
let (aload, bidx) = match self.poll_endpoint_index_load(aidx) {
Ok(ready) => (ready, bidx),
Err(e) => {
info!("evicting failed endpoint: {}", e.into());
let _ = self.endpoints.swap_remove_index(aidx);
let new_bidx = Self::repair_index(bidx, aidx, self.endpoints.len())
.expect("random indices must be distinct");
(Async::NotReady, new_bidx)
}
};
let (bload, aidx) = match self.poll_endpoint_index_load(bidx) {
Ok(ready) => (ready, aidx),
Err(e) => {
info!("evicting failed endpoint: {}", e.into());
let _ = self.endpoints.swap_remove_index(bidx);
let new_aidx = Self::repair_index(aidx, bidx, self.endpoints.len())
.expect("random indices must be distinct");
(Async::NotReady, new_aidx)
}
};
trace!("load[{}]={:?}; load[{}]={:?}", aidx, aload, bidx, bload);
let ready = match (aload, bload) {
(Async::Ready(aload), Async::Ready(bload)) => {
if aload <= bload {
Async::Ready(aidx)
} else {
Async::Ready(bidx)
}
}
(Async::Ready(_), Async::NotReady) => Async::Ready(aidx),
(Async::NotReady, Async::Ready(_)) => Async::Ready(bidx),
(Async::NotReady, Async::NotReady) => Async::NotReady,
};
trace!(" -> ready={:?}", ready);
Ok(ready)
}
}
}
/// Accesses an endpoint by index and, if it is ready, returns its current load.
fn poll_endpoint_index_load<Svc, Request>(
&mut self,
index: usize,
) -> Poll<Svc::Metric, Svc::Error>
where
D: Discover<Service = Svc>,
Svc: Service<Request> + Load,
Svc::Error: Into<error::Error>,
{
let (_, svc) = self.endpoints.get_index_mut(index).expect("invalid index");
try_ready!(svc.poll_ready());
Ok(Async::Ready(svc.load()))
}
}
impl<D, Svc, Request> Service<Request> for Balance<D>
where
D: Discover<Service = Svc>,
D::Error: Into<error::Error>,
Svc: Service<Request> + Load,
Svc::Error: Into<error::Error>,
Svc::Metric: std::fmt::Debug,
{
type Response = <D::Service as Service<Request>>::Response;
type Error = error::Error;
type Future =
future::MapErr<<D::Service as Service<Request>>::Future, fn(Svc::Error) -> error::Error>;
/// Prepares the balancer to process a request.
///
/// When `Async::Ready` is returned, `ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// First and foremost, process discovery updates. This removes or updates a
// previously-selected `ready_index` if appropriate.
self.poll_discover()?;
if let Some(index) = self.ready_index {
debug_assert!(index < self.endpoints.len());
// Ensure the selected endpoint is still ready.
match self.poll_endpoint_index_load(index) {
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => {}
Err(e) => {
drop(self.endpoints.swap_remove_index(index));
info!("evicting failed endpoint: {}", e.into());
}
}
self.ready_index = None;
}
let tries = match self.endpoints.len() {
0 => return Ok(Async::NotReady),
1 => 1,
n => n / 2,
};
for _ in 0..tries {
if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? {
trace!("ready: {:?}", idx);
self.ready_index = Some(idx);
return Ok(Async::Ready(()));
}
}
trace!("exhausted {} attempts", tries);
Ok(Async::NotReady)
}
fn call(&mut self, request: Request) -> Self::Future {
let index = self.ready_index.take().expect("not ready");
let (_, svc) = self
.endpoints
.get_index_mut(index)
.expect("invalid ready index");
svc.call(request).map_err(Into::into)
}
}

View File

@ -0,0 +1,125 @@
use futures::{Async, Future};
use tower_discover::ServiceList;
use tower_load as load;
use tower_service::Service;
use tower_test::mock;
use super::*;
macro_rules! assert_ready {
($svc:expr) => {{
assert_ready!($svc, "must be ready");
}};
($svc:expr, $msg:expr) => {{
assert!($svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
macro_rules! assert_not_ready {
($svc:expr) => {{
assert_not_ready!($svc, "must not be ready");
}};
($svc:expr, $msg:expr) => {{
assert!(!$svc.poll_ready().expect("must not fail").is_ready(), $msg);
}};
}
#[test]
fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = Balance::from_entropy(disco);
assert_not_ready!(svc);
}
#[test]
fn single_endpoint() {
let (mock, mut handle) = mock::pair();
let mock = load::Constant::new(mock, 0);
let disco = ServiceList::new(vec![mock].into_iter());
let mut svc = Balance::from_entropy(disco);
with_task(|| {
handle.allow(0);
assert_not_ready!(svc);
assert_eq!(svc.len(), 1, "balancer must have discovered endpoint");
handle.allow(1);
assert_ready!(svc);
let fut = svc.call(());
let ((), rsp) = handle.next_request().unwrap();
rsp.send_response(1);
assert_eq!(fut.wait().expect("call must complete"), 1);
handle.allow(1);
assert_ready!(svc);
handle.send_error("endpoint lost");
assert_not_ready!(svc);
assert!(svc.len() == 0, "balancer must drop failed endpoints");
});
}
#[test]
fn two_endpoints_with_equal_weight() {
let (mock_a, mut handle_a) = mock::pair();
let (mock_b, mut handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1);
let mock_b = load::Constant::new(mock_b, 1);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = Balance::from_entropy(disco);
with_task(|| {
handle_a.allow(0);
handle_b.allow(0);
assert_not_ready!(svc);
assert_eq!(svc.len(), 2, "balancer must have discovered both endpoints");
handle_a.allow(1);
handle_b.allow(0);
assert_ready!(svc, "must be ready when one of two services is ready");
{
let fut = svc.call(());
let ((), rsp) = handle_a.next_request().unwrap();
rsp.send_response("a");
assert_eq!(fut.wait().expect("call must complete"), "a");
}
handle_a.allow(0);
handle_b.allow(1);
assert_ready!(svc, "must be ready when both endpoints are ready");
{
let fut = svc.call(());
let ((), rsp) = handle_b.next_request().unwrap();
rsp.send_response("b");
assert_eq!(fut.wait().expect("call must complete"), "b");
}
handle_a.allow(1);
handle_b.allow(1);
assert_ready!(svc, "must be ready when both endpoints are ready");
{
let fut = svc.call(());
for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() {
tx.send_response(c);
}
}
fut.wait().expect("call must complete");
}
handle_a.send_error("endpoint lost");
handle_b.allow(1);
assert_ready!(svc, "must be ready after one endpoint is removed");
assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",);
});
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}

View File

@ -14,13 +14,14 @@
//! added or removed.
#![deny(missing_docs)]
use super::{Balance, Choose};
use super::p2c::Balance;
use futures::{try_ready, Async, Future, Poll};
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_service::Service;
use tower_util::MakeService;
enum Load {
enum Level {
/// Load is low -- remove a service instance.
Low,
/// Load is normal -- keep the service set as it is.
@ -38,7 +39,7 @@ where
maker: MS,
making: Option<MS::Future>,
target: Target,
load: Load,
load: Level,
services: usize,
limit: Option<usize>,
}
@ -60,7 +61,7 @@ where
self.making = Some(self.maker.make_service(self.target.clone()));
}
if let Load::High = self.load {
if let Level::High = self.load {
if self.making.is_none() {
if self
.limit
@ -79,7 +80,7 @@ where
if let Some(mut fut) = self.making.take() {
if let Async::Ready(s) = fut.poll()? {
self.services += 1;
self.load = Load::Normal;
self.load = Level::Normal;
return Ok(Async::Ready(Change::Insert(self.services, s)));
} else {
self.making = Some(fut);
@ -88,13 +89,13 @@ where
}
match self.load {
Load::High => {
Level::High => {
unreachable!("found high load but no Service being made");
}
Load::Normal => Ok(Async::NotReady),
Load::Low if self.services == 1 => Ok(Async::NotReady),
Load::Low => {
self.load = Load::Normal;
Level::Normal => Ok(Async::NotReady),
Level::Low if self.services == 1 => Ok(Async::NotReady),
Level::Low => {
self.load = Level::Normal;
let rm = self.services;
self.services -= 1;
Ok(Async::Ready(Change::Remove(rm)))
@ -199,30 +200,30 @@ impl Builder {
}
/// See [`Pool::new`].
pub fn build<C, MS, Target, Request>(
pub fn build<MS, Target, Request>(
&self,
make_service: MS,
target: Target,
choose: C,
) -> Pool<C, MS, Target, Request>
) -> Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
Target: Clone,
C: Choose<usize, MS::Service>,
{
let d = PoolDiscoverer {
maker: make_service,
making: None,
target,
load: Load::Normal,
load: Level::Normal,
services: 0,
limit: self.limit,
};
Pool {
balance: Balance::new(d, choose),
balance: Balance::from_entropy(d),
options: *self,
ewma: self.init,
}
@ -230,48 +231,50 @@ impl Builder {
}
/// A dynamically sized, load-balanced pool of `Service` instances.
pub struct Pool<C, MS, Target, Request>
pub struct Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
Target: Clone,
{
balance: Balance<PoolDiscoverer<MS, Target, Request>, C>,
balance: Balance<PoolDiscoverer<MS, Target, Request>>,
options: Builder,
ewma: f64,
}
impl<C, MS, Target, Request> Pool<C, MS, Target, Request>
impl<MS, Target, Request> Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
Target: Clone,
C: Choose<usize, MS::Service>,
{
/// Construct a new dynamically sized `Pool`.
///
/// If many calls to `poll_ready` return `NotReady`, `new_service` is used to construct another
/// `Service` that is then added to the load-balanced pool. If multiple services are available,
/// `choose` is used to determine which one to use (just as in `Balance`). If many calls to
/// `poll_ready` succeed, the most recently added `Service` is dropped from the pool.
pub fn new(make_service: MS, target: Target, choose: C) -> Self {
Builder::new().build(make_service, target, choose)
/// If many calls to `poll_ready` return `NotReady`, `new_service` is used to
/// construct another `Service` that is then added to the load-balanced pool.
/// If many calls to `poll_ready` succeed, the most recently added `Service`
/// is dropped from the pool.
pub fn new(make_service: MS, target: Target) -> Self {
Builder::new().build(make_service, target)
}
}
impl<C, MS, Target, Request> Service<Request> for Pool<C, MS, Target, Request>
impl<MS, Target, Request> Service<Request> for Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: ::std::error::Error + Send + Sync + 'static,
MS::Error: ::std::error::Error + Send + Sync + 'static,
Target: Clone,
C: Choose<usize, MS::Service>,
{
type Response = <Balance<PoolDiscoverer<MS, Target, Request>, C> as Service<Request>>::Response;
type Error = <Balance<PoolDiscoverer<MS, Target, Request>, C> as Service<Request>>::Error;
type Future = <Balance<PoolDiscoverer<MS, Target, Request>, C> as Service<Request>>::Future;
type Response = <Balance<PoolDiscoverer<MS, Target, Request>> as Service<Request>>::Response;
type Error = <Balance<PoolDiscoverer<MS, Target, Request>> as Service<Request>>::Error;
type Future = <Balance<PoolDiscoverer<MS, Target, Request>> as Service<Request>>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if let Async::Ready(()) = self.balance.poll_ready()? {
@ -279,42 +282,43 @@ where
// update ewma with a 0 sample
self.ewma = (1.0 - self.options.alpha) * self.ewma;
let discover = self.balance.discover_mut();
if self.ewma < self.options.low {
self.balance.discover.load = Load::Low;
discover.load = Level::Low;
if self.balance.discover.services > 1 {
if discover.services > 1 {
// reset EWMA so we don't immediately try to remove another service
self.ewma = self.options.init;
}
} else {
self.balance.discover.load = Load::Normal;
discover.load = Level::Normal;
}
Ok(Async::Ready(()))
} else if self.balance.discover.making.is_none() {
return Ok(Async::Ready(()));
}
let discover = self.balance.discover_mut();
if discover.making.is_none() {
// no services are ready -- we're overloaded
// update ewma with a 1 sample
self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma;
if self.ewma > self.options.high {
self.balance.discover.load = Load::High;
discover.load = Level::High;
// don't reset the EWMA -- in theory, poll_ready should now start returning
// `Ready`, so we won't try to launch another service immediately.
// we clamp it to high though in case the # of services is limited.
self.ewma = self.options.high;
} else {
self.balance.discover.load = Load::Normal;
discover.load = Level::Normal;
}
Ok(Async::NotReady)
} else {
// no services are ready, but we're already making another service!
Ok(Async::NotReady)
}
Ok(Async::NotReady)
}
fn call(&mut self, req: Request) -> Self::Future {
Service::call(&mut self.balance, req)
self.balance.call(req)
}
}

View File

@ -1,136 +0,0 @@
use futures::{future, Async, Poll};
use quickcheck::*;
use std::collections::VecDeque;
use tower_discover::Change;
use tower_service::Service;
use crate::*;
type Error = Box<dyn std::error::Error + Send + Sync>;
struct ReluctantDisco(VecDeque<Change<usize, ReluctantService>>);
struct ReluctantService {
polls_until_ready: usize,
}
impl Discover for ReluctantDisco {
type Key = usize;
type Service = ReluctantService;
type Error = Error;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
let r = self
.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
}
}
impl Service<()> for ReluctantService {
type Response = ();
type Error = Error;
type Future = future::FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.polls_until_ready == 0 {
return Ok(Async::Ready(()));
}
self.polls_until_ready -= 1;
return Ok(Async::NotReady);
}
fn call(&mut self, _: ()) -> Self::Future {
future::ok(())
}
}
quickcheck! {
/// Creates a random number of services, each of which must be polled a random
/// number of times before becoming ready. As the balancer is polled, ensure that
/// it does not become ready prematurely and that services are promoted from
/// not_ready to ready.
fn poll_ready(service_tries: Vec<usize>) -> TestResult {
// Stores the number of pending services after each poll_ready call.
let mut pending_at = Vec::new();
let disco = {
let mut changes = VecDeque::new();
for (i, n) in service_tries.iter().map(|n| *n).enumerate() {
for j in 0..n {
if j == pending_at.len() {
pending_at.push(1);
} else {
pending_at[j] += 1;
}
}
let s = ReluctantService { polls_until_ready: n };
changes.push_back(Change::Insert(i, s));
}
ReluctantDisco(changes)
};
pending_at.push(0);
let mut balancer = Balance::new(disco, choose::RoundRobin::default());
let services = service_tries.len();
let mut next_pos = 0;
for pending in pending_at.iter().map(|p| *p) {
assert!(pending <= services);
let ready = services - pending;
match balancer.poll_ready() {
Err(_) => return TestResult::error("poll_ready failed"),
Ok(p) => {
if p.is_ready() != (ready > 0) {
return TestResult::failed();
}
}
}
if balancer.num_ready() != ready {
return TestResult::failed();
}
if balancer.num_not_ready() != pending {
return TestResult::failed();
}
if balancer.is_ready() != (ready > 0) {
return TestResult::failed();
}
if balancer.is_not_ready() != (ready == 0) {
return TestResult::failed();
}
if balancer.dispatched_ready_index.is_some() {
return TestResult::failed();
}
if ready == 0 {
if balancer.chosen_ready_index.is_some() {
return TestResult::failed();
}
} else {
// Check that the round-robin chooser is doing its thing:
match balancer.chosen_ready_index {
None => return TestResult::failed(),
Some(idx) => {
if idx != next_pos {
return TestResult::failed();
}
}
}
next_pos = (next_pos + 1) % ready;
}
}
TestResult::passed()
}
}

View File

@ -93,14 +93,14 @@ impl<T, U> Service<T> for Mock<T, U> {
return Err(error::Closed::new().into());
}
if self.can_send {
return Ok(().into());
}
if let Some(e) = state.err_with.take() {
return Err(e);
}
if self.can_send {
return Ok(().into());
}
if state.rem > 0 {
assert!(!state.tasks.contains_key(&self.id));