Spring cleaning for tower::balance (#449)

Noteworthy changes:

 - All constructors now follow the same pattern: `new` uses OS entropy,
   `from_rng` takes a `R: Rng` and seeds the randomness from there.
   `from_rng` is fallible, since randomness generators can be fallible.
 - `BalanceLayer` was renamed to `MakeBalanceLayer`, since it is not
   _really_ a `BalanceLayer`. The name of `BalanceMake` was also
   "normalized" to `MakeBalance`.

Another observation: the `Debug` bound on `Load::Metric` in
`p2c::Balance`, while not particularly onerous, generates really
confusing errors if you forget it include it. And crucially, the error
never points at `Debug` (should we file a compiler issue?), so I pretty
much had to guess my way to that being wrong in the doc example.

It would probably be useful to add a documentation example to
`MakeBalanceLayer` or `MakeBalance` (I suspect just one of them is fine,
since they're basically the same). Since I've never used it, and find it
hard to think of uses for it, it might be good if someone with more
experience with it wrote one.
This commit is contained in:
Jon Gjengset 2020-04-24 13:21:11 -04:00 committed by GitHub
parent 6a25d322b5
commit 1c2d50680a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 162 additions and 57 deletions

View File

@ -58,7 +58,7 @@ async fn main() {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new(
let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
@ -67,7 +67,7 @@ async fn main() {
run("P2C+PeakEWMA...", pe).await;
let d = gen_disco();
let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));

View File

@ -1,8 +1,8 @@
//! Error types
//! Error types for the `tower::balance` middleware.
use std::fmt;
/// An error returned when the balancer's endpoint discovery stream fails.
/// The balancer's endpoint discovery stream failed.
#[derive(Debug)]
pub struct Discover(pub(crate) crate::BoxError);

View File

@ -1,4 +1,57 @@
//! Load balancing middlewares.
//! Middleware that allows balancing load among multiple services.
//!
//! In larger systems, multiple endpoints are often available for a given service. As load
//! increases, you want to ensure that that load is spread evenly across the available services.
//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded
//! service, even when spare capacity is available to handle that request elsewhere.
//!
//! This module provides two pieces of middleware that helps with this type of load balancing:
//!
//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust
//! technique for spreading load across services with only inexact load measurements. Use this if
//! the set of available services is not within your control, and you simply want to spread load
//! among that set of services.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//!
//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall
//! current load by tracking successful and unsuccessful calls to `poll_ready`, and uses an
//! exponentially weighted moving average to add (using [`tower::make_service::MakeService`]) or
//! remove (by dropping) services in response to increases or decreases in load. Use this if you
//! are able to dynamically add more service endpoints to the system to handle added load.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! # #[cfg(feature = "load")]
//! # fn warnings_are_errors() {
//! use tower::balance::p2c::Balance;
//! use tower::load::Load;
//! use tower::{Service, ServiceExt};
//! use futures_util::pin_mut;
//! # use futures_core::Stream;
//! # use futures_util::StreamExt;
//!
//! async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
//! where
//! S::Error: Into<tower::BoxError>,
//! # // this bound is pretty unfortunate, and the compiler does _not_ help
//! S::Metric: std::fmt::Debug,
//! {
//! // Spread load evenly across the two services
//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));
//!
//! // Issue all the requests that come in.
//! // Some will go to svc1, some will go to svc2.
//! pin_mut!(reqs);
//! let mut responses = p2c.call_all(reqs);
//! while let Some(rsp) = responses.next().await {
//! // ...
//! }
//! }
//! # }
//! ```
pub mod error;
pub mod p2c;

View File

@ -1,17 +1,27 @@
use super::BalanceMake;
use super::MakeBalance;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Efficiently distributes requests across an arbitrary number of services
/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the
/// "inner" service in response to requests coming from the "outer" service.
///
/// This construction may seem a little odd at first glance. This is not a layer that takes
/// requests and produces responses in the traditional sense. Instead, it is more like
/// [`MakeService`](tower::make_service::MakeService) in that it takes service _descriptors_ (see
/// `Target` on `MakeService`) and produces _services_. Since [`Balance`] spreads requests across a
/// _set_ of services, the inner service should produce a [`Discover`], not just a single
/// [`Service`], given a service descriptor.
///
/// See the [module-level documentation](..) for details on load balancing.
#[derive(Clone)]
pub struct BalanceLayer<D, Req> {
pub struct MakeBalanceLayer<D, Req> {
rng: SmallRng,
_marker: PhantomData<fn(D, Req)>,
}
impl<D, Req> BalanceLayer<D, Req> {
/// Builds a balancer using the system entropy.
impl<D, Req> MakeBalanceLayer<D, Req> {
/// Build balancers using operating system entropy.
pub fn new() -> Self {
Self {
rng: SmallRng::from_entropy(),
@ -19,7 +29,7 @@ impl<D, Req> BalanceLayer<D, Req> {
}
}
/// Builds a balancer from the provided RNG.
/// Build balancers using a seed from the provided random number generator.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(rng: &mut R) -> Result<Self, rand::Error> {
@ -31,17 +41,17 @@ impl<D, Req> BalanceLayer<D, Req> {
}
}
impl<S, Req> Layer<S> for BalanceLayer<S, Req> {
type Service = BalanceMake<S, Req>;
impl<S, Req> Layer<S> for MakeBalanceLayer<S, Req> {
type Service = MakeBalance<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service {
BalanceMake::new(make_discover, self.rng.clone())
MakeBalance::from_rng(make_discover, self.rng.clone()).expect("SmallRng is infallible")
}
}
impl<D, Req> fmt::Debug for BalanceLayer<D, Req> {
impl<D, Req> fmt::Debug for MakeBalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BalanceLayer")
f.debug_struct("MakeBalanceLayer")
.field("rng", &self.rng)
.finish()
}

View File

@ -2,7 +2,7 @@ use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
@ -12,15 +12,22 @@ use std::{
};
use tower_service::Service;
/// Makes `Balancer`s given an inner service that makes `Discover`s.
/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service.
///
/// This is effectively an implementation of [`MakeService`](tower::make_service::MakeService),
/// except that it forwards the service descriptors (`Target`) to an inner service (`S`), and
/// expects that service to produce a service set in the form of a [`Discover`]. It then wraps the
/// service set in a [`Balance`] before returning it as the "made" service.
///
/// See the [module-level documentation](..) for details on load balancing.
#[derive(Clone, Debug)]
pub struct BalanceMake<S, Req> {
pub struct MakeBalance<S, Req> {
inner: S,
rng: SmallRng,
_marker: PhantomData<fn(Req)>,
}
/// Makes a balancer instance.
/// A [`Balance`] in the making.
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
@ -30,22 +37,30 @@ pub struct MakeFuture<F, Req> {
_marker: PhantomData<fn(Req)>,
}
impl<S, Req> BalanceMake<S, Req> {
pub(crate) fn new(inner: S, rng: SmallRng) -> Self {
impl<S, Req> MakeBalance<S, Req> {
/// Build balancers using operating system entropy.
pub fn new(make_discover: S) -> Self {
Self {
inner,
rng,
inner: make_discover,
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(make_discover: S) -> Self {
Self::new(make_discover, SmallRng::from_entropy())
/// Build balancers using a seed from the provided random number generator.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(inner: S, rng: R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
inner,
rng,
_marker: PhantomData,
})
}
}
impl<S, Target, Req> Service<Target> for BalanceMake<S, Req>
impl<S, Target, Req> Service<Target> for MakeBalance<S, Req>
where
S: Service<Target>,
S::Response: Discover,
@ -83,7 +98,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner, this.rng.clone());
let svc = Balance::from_rng(inner, this.rng.clone()).expect("SmallRng is infallible");
Poll::Ready(Ok(svc))
}
}

View File

@ -1,4 +1,32 @@
//! A Power-of-Two-Choices Load Balancer
//! This module implements the "[Power of Two Random Choices]" load balancing algorithm.
//!
//! It is a simple but robust technique for spreading load across services with only inexact load
//! measurements. As its name implies, whenever a request comes in, it samples two ready services
//! at random, and issues the request to whichever service is less loaded. How loaded a service is
//! is determined by the return value of [`Load`](tower::load::Load).
//!
//! As described in the [Finagle Guide][finagle]:
//!
//! > The algorithm randomly picks two services from the set of ready endpoints and
//! > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > expect a manageable upper bound on the maximum load of any server.
//! >
//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > `n` is the number of servers in the cluster.
//!
//! The balance service and layer implementations rely on _service discovery_ to provide the
//! underlying set of services to balance requests across. This happens through the
//! [`Discover`](tower::discover::Discover) trait, which is essentially a `Stream` that indicates
//! when services become available or go away. If you have a fixed set of services, consider using
//! [`ServiceList`](tower::discover::ServiceList).
//!
//! Since the load balancer needs to perform _random_ choices, the constructors in this module
//! usually come in two forms: one that uses randomness provided by the operating system, and one
//! that lets you specify the random seed to use. Usually the former is what you'll want, though
//! the latter may come in handy for reproducability or to reduce reliance on the operating system.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
mod layer;
mod make;
@ -7,6 +35,6 @@ mod service;
#[cfg(test)]
mod test;
pub use layer::BalanceLayer;
pub use make::{BalanceMake, MakeFuture};
pub use layer::MakeBalanceLayer;
pub use make::{MakeBalance, MakeFuture};
pub use service::Balance;

View File

@ -5,7 +5,7 @@ use crate::ready_cache::{error::Failed, ReadyCache};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
@ -18,24 +18,15 @@ use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};
/// Distributes requests across inner services using the [Power of Two Choices][p2c].
/// Efficiently distributes requests across an arbitrary number of services.
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
/// See the [module-level documentation](..) for details.
///
/// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement
/// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes
/// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you
/// construct the `Balance` instance. For more details, see [#319].
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin
/// [#319]: https://github.com/tower-rs/tower/issues/319
pub struct Balance<D, Req>
@ -68,10 +59,10 @@ where
}
}
#[pin_project]
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is removed from discovery.
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[pin_project]
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
@ -94,10 +85,10 @@ where
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
/// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self {
/// Constructs a load balancer that uses operating system entropy.
pub fn new(discover: D) -> Self {
Self {
rng,
rng: SmallRng::from_entropy(),
discover,
services: ReadyCache::default(),
ready_index: None,
@ -106,9 +97,17 @@ where
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(discover: D) -> Self {
Self::new(discover, SmallRng::from_entropy())
/// Constructs a load balancer seeded with the provided random number generator.
pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
discover,
services: ReadyCache::default(),
ready_index: None,
_req: PhantomData,
})
}
/// Returns the number of endpoints currently tracked by the balancer.

View File

@ -11,7 +11,7 @@ use super::*;
async fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = mock::Spawn::new(Balance::from_entropy(disco));
let mut svc = mock::Spawn::new(Balance::new(disco));
assert_pending!(svc.poll_ready());
}
@ -20,7 +20,7 @@ async fn single_endpoint() {
let (mut svc, mut handle) = mock::spawn_with(|s| {
let mock = load::Constant::new(s, 0);
let disco = ServiceList::new(vec![mock].into_iter());
Balance::from_entropy(disco)
Balance::new(disco)
});
handle.allow(0);
@ -61,7 +61,7 @@ async fn two_endpoints_with_equal_load() {
pin_mut!(handle_b);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = mock::Spawn::new(Balance::from_entropy(disco));
let mut svc = mock::Spawn::new(Balance::new(disco));
handle_a.allow(0);
handle_b.allow(0);

View File

@ -289,7 +289,7 @@ impl Builder {
};
Pool {
balance: Balance::from_entropy(Box::pin(d)),
balance: Balance::new(Box::pin(d)),
options: *self,
ewma: self.init,
}

View File

@ -34,7 +34,7 @@ impl tower::load::Load for Mock {
fn stress() {
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
let mut cache = Balance::<_, Req>::from_entropy(rx);
let mut cache = Balance::<_, Req>::new(rx);
let mut nready = 0;
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();