diff --git a/tower/examples/tower-balance.rs b/tower/examples/tower-balance.rs index eb891c0..649cbeb 100644 --- a/tower/examples/tower-balance.rs +++ b/tower/examples/tower-balance.rs @@ -58,7 +58,7 @@ async fn main() { let decay = Duration::from_secs(10); let d = gen_disco(); - let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new( + let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new( d, DEFAULT_RTT, decay, @@ -67,7 +67,7 @@ async fn main() { run("P2C+PeakEWMA...", pe).await; let d = gen_disco(); - let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new( + let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new( d, load::CompleteOnResponse::default(), )); diff --git a/tower/src/balance/error.rs b/tower/src/balance/error.rs index d7f38bf..59ddbf5 100644 --- a/tower/src/balance/error.rs +++ b/tower/src/balance/error.rs @@ -1,8 +1,8 @@ -//! Error types +//! Error types for the `tower::balance` middleware. use std::fmt; -/// An error returned when the balancer's endpoint discovery stream fails. +/// The balancer's endpoint discovery stream failed. #[derive(Debug)] pub struct Discover(pub(crate) crate::BoxError); diff --git a/tower/src/balance/mod.rs b/tower/src/balance/mod.rs index 21a3948..96860bc 100644 --- a/tower/src/balance/mod.rs +++ b/tower/src/balance/mod.rs @@ -1,4 +1,57 @@ -//! Load balancing middlewares. +//! Middleware that allows balancing load among multiple services. +//! +//! In larger systems, multiple endpoints are often available for a given service. As load +//! increases, you want to ensure that that load is spread evenly across the available services. +//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded +//! service, even when spare capacity is available to handle that request elsewhere. +//! +//! This module provides two pieces of middleware that helps with this type of load balancing: +//! +//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust +//! technique for spreading load across services with only inexact load measurements. Use this if +//! the set of available services is not within your control, and you simply want to spread load +//! among that set of services. +//! +//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf +//! +//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall +//! current load by tracking successful and unsuccessful calls to `poll_ready`, and uses an +//! exponentially weighted moving average to add (using [`tower::make_service::MakeService`]) or +//! remove (by dropping) services in response to increases or decreases in load. Use this if you +//! are able to dynamically add more service endpoints to the system to handle added load. +//! +//! # Examples +//! +//! ```rust +//! # #[cfg(feature = "util")] +//! # #[cfg(feature = "load")] +//! # fn warnings_are_errors() { +//! use tower::balance::p2c::Balance; +//! use tower::load::Load; +//! use tower::{Service, ServiceExt}; +//! use futures_util::pin_mut; +//! # use futures_core::Stream; +//! # use futures_util::StreamExt; +//! +//! async fn spread + Load>(svc1: S, svc2: S, reqs: impl Stream) +//! where +//! S::Error: Into, +//! # // this bound is pretty unfortunate, and the compiler does _not_ help +//! S::Metric: std::fmt::Debug, +//! { +//! // Spread load evenly across the two services +//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2])); +//! +//! // Issue all the requests that come in. +//! // Some will go to svc1, some will go to svc2. +//! pin_mut!(reqs); +//! let mut responses = p2c.call_all(reqs); +//! while let Some(rsp) = responses.next().await { +//! // ... +//! } +//! } +//! # } +//! ``` pub mod error; pub mod p2c; diff --git a/tower/src/balance/p2c/layer.rs b/tower/src/balance/p2c/layer.rs index a90747a..3ccae58 100644 --- a/tower/src/balance/p2c/layer.rs +++ b/tower/src/balance/p2c/layer.rs @@ -1,17 +1,27 @@ -use super::BalanceMake; +use super::MakeBalance; use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::{fmt, marker::PhantomData}; use tower_layer::Layer; -/// Efficiently distributes requests across an arbitrary number of services +/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the +/// "inner" service in response to requests coming from the "outer" service. +/// +/// This construction may seem a little odd at first glance. This is not a layer that takes +/// requests and produces responses in the traditional sense. Instead, it is more like +/// [`MakeService`](tower::make_service::MakeService) in that it takes service _descriptors_ (see +/// `Target` on `MakeService`) and produces _services_. Since [`Balance`] spreads requests across a +/// _set_ of services, the inner service should produce a [`Discover`], not just a single +/// [`Service`], given a service descriptor. +/// +/// See the [module-level documentation](..) for details on load balancing. #[derive(Clone)] -pub struct BalanceLayer { +pub struct MakeBalanceLayer { rng: SmallRng, _marker: PhantomData, } -impl BalanceLayer { - /// Builds a balancer using the system entropy. +impl MakeBalanceLayer { + /// Build balancers using operating system entropy. pub fn new() -> Self { Self { rng: SmallRng::from_entropy(), @@ -19,7 +29,7 @@ impl BalanceLayer { } } - /// Builds a balancer from the provided RNG. + /// Build balancers using a seed from the provided random number generator. /// /// This may be preferrable when many balancers are initialized. pub fn from_rng(rng: &mut R) -> Result { @@ -31,17 +41,17 @@ impl BalanceLayer { } } -impl Layer for BalanceLayer { - type Service = BalanceMake; +impl Layer for MakeBalanceLayer { + type Service = MakeBalance; fn layer(&self, make_discover: S) -> Self::Service { - BalanceMake::new(make_discover, self.rng.clone()) + MakeBalance::from_rng(make_discover, self.rng.clone()).expect("SmallRng is infallible") } } -impl fmt::Debug for BalanceLayer { +impl fmt::Debug for MakeBalanceLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BalanceLayer") + f.debug_struct("MakeBalanceLayer") .field("rng", &self.rng) .finish() } diff --git a/tower/src/balance/p2c/make.rs b/tower/src/balance/p2c/make.rs index 3f92abf..8712e6e 100644 --- a/tower/src/balance/p2c/make.rs +++ b/tower/src/balance/p2c/make.rs @@ -2,7 +2,7 @@ use super::Balance; use crate::discover::Discover; use futures_core::ready; use pin_project::pin_project; -use rand::{rngs::SmallRng, SeedableRng}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::hash::Hash; use std::marker::PhantomData; use std::{ @@ -12,15 +12,22 @@ use std::{ }; use tower_service::Service; -/// Makes `Balancer`s given an inner service that makes `Discover`s. +/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service. +/// +/// This is effectively an implementation of [`MakeService`](tower::make_service::MakeService), +/// except that it forwards the service descriptors (`Target`) to an inner service (`S`), and +/// expects that service to produce a service set in the form of a [`Discover`]. It then wraps the +/// service set in a [`Balance`] before returning it as the "made" service. +/// +/// See the [module-level documentation](..) for details on load balancing. #[derive(Clone, Debug)] -pub struct BalanceMake { +pub struct MakeBalance { inner: S, rng: SmallRng, _marker: PhantomData, } -/// Makes a balancer instance. +/// A [`Balance`] in the making. #[pin_project] #[derive(Debug)] pub struct MakeFuture { @@ -30,22 +37,30 @@ pub struct MakeFuture { _marker: PhantomData, } -impl BalanceMake { - pub(crate) fn new(inner: S, rng: SmallRng) -> Self { +impl MakeBalance { + /// Build balancers using operating system entropy. + pub fn new(make_discover: S) -> Self { Self { - inner, - rng, + inner: make_discover, + rng: SmallRng::from_entropy(), _marker: PhantomData, } } - /// Initializes a P2C load balancer from the OS's entropy source. - pub fn from_entropy(make_discover: S) -> Self { - Self::new(make_discover, SmallRng::from_entropy()) + /// Build balancers using a seed from the provided random number generator. + /// + /// This may be preferrable when many balancers are initialized. + pub fn from_rng(inner: S, rng: R) -> Result { + let rng = SmallRng::from_rng(rng)?; + Ok(Self { + inner, + rng, + _marker: PhantomData, + }) } } -impl Service for BalanceMake +impl Service for MakeBalance where S: Service, S::Response: Discover, @@ -83,7 +98,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let inner = ready!(this.inner.poll(cx))?; - let svc = Balance::new(inner, this.rng.clone()); + let svc = Balance::from_rng(inner, this.rng.clone()).expect("SmallRng is infallible"); Poll::Ready(Ok(svc)) } } diff --git a/tower/src/balance/p2c/mod.rs b/tower/src/balance/p2c/mod.rs index cf42fd2..870bcef 100644 --- a/tower/src/balance/p2c/mod.rs +++ b/tower/src/balance/p2c/mod.rs @@ -1,4 +1,32 @@ -//! A Power-of-Two-Choices Load Balancer +//! This module implements the "[Power of Two Random Choices]" load balancing algorithm. +//! +//! It is a simple but robust technique for spreading load across services with only inexact load +//! measurements. As its name implies, whenever a request comes in, it samples two ready services +//! at random, and issues the request to whichever service is less loaded. How loaded a service is +//! is determined by the return value of [`Load`](tower::load::Load). +//! +//! As described in the [Finagle Guide][finagle]: +//! +//! > The algorithm randomly picks two services from the set of ready endpoints and +//! > selects the least loaded of the two. By repeatedly using this strategy, we can +//! > expect a manageable upper bound on the maximum load of any server. +//! > +//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where +//! > `n` is the number of servers in the cluster. +//! +//! The balance service and layer implementations rely on _service discovery_ to provide the +//! underlying set of services to balance requests across. This happens through the +//! [`Discover`](tower::discover::Discover) trait, which is essentially a `Stream` that indicates +//! when services become available or go away. If you have a fixed set of services, consider using +//! [`ServiceList`](tower::discover::ServiceList). +//! +//! Since the load balancer needs to perform _random_ choices, the constructors in this module +//! usually come in two forms: one that uses randomness provided by the operating system, and one +//! that lets you specify the random seed to use. Usually the former is what you'll want, though +//! the latter may come in handy for reproducability or to reduce reliance on the operating system. +//! +//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf +//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded mod layer; mod make; @@ -7,6 +35,6 @@ mod service; #[cfg(test)] mod test; -pub use layer::BalanceLayer; -pub use make::{BalanceMake, MakeFuture}; +pub use layer::MakeBalanceLayer; +pub use make::{MakeBalance, MakeFuture}; pub use service::Balance; diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index 9676a9b..4943fc3 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -5,7 +5,7 @@ use crate::ready_cache::{error::Failed, ReadyCache}; use futures_core::ready; use futures_util::future::{self, TryFutureExt}; use pin_project::pin_project; -use rand::{rngs::SmallRng, SeedableRng}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::hash::Hash; use std::marker::PhantomData; use std::{ @@ -18,24 +18,15 @@ use tokio::sync::oneshot; use tower_service::Service; use tracing::{debug, trace}; -/// Distributes requests across inner services using the [Power of Two Choices][p2c]. +/// Efficiently distributes requests across an arbitrary number of services. /// -/// As described in the [Finagle Guide][finagle]: -/// -/// > The algorithm randomly picks two services from the set of ready endpoints and -/// > selects the least loaded of the two. By repeatedly using this strategy, we can -/// > expect a manageable upper bound on the maximum load of any server. -/// > -/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where -/// > `n` is the number of servers in the cluster. +/// See the [module-level documentation](..) for details. /// /// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement /// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes /// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you /// construct the `Balance` instance. For more details, see [#319]. /// -/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded -/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf /// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin /// [#319]: https://github.com/tower-rs/tower/issues/319 pub struct Balance @@ -68,10 +59,10 @@ where } } -#[pin_project] /// A Future that becomes satisfied when an `S`-typed service is ready. /// -/// May fail due to cancelation, i.e. if the service is removed from discovery. +/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set. +#[pin_project] #[derive(Debug)] struct UnreadyService { key: Option, @@ -94,10 +85,10 @@ where D::Service: Service, >::Error: Into, { - /// Initializes a P2C load balancer from the provided randomization source. - pub fn new(discover: D, rng: SmallRng) -> Self { + /// Constructs a load balancer that uses operating system entropy. + pub fn new(discover: D) -> Self { Self { - rng, + rng: SmallRng::from_entropy(), discover, services: ReadyCache::default(), ready_index: None, @@ -106,9 +97,17 @@ where } } - /// Initializes a P2C load balancer from the OS's entropy source. - pub fn from_entropy(discover: D) -> Self { - Self::new(discover, SmallRng::from_entropy()) + /// Constructs a load balancer seeded with the provided random number generator. + pub fn from_rng(discover: D, rng: R) -> Result { + let rng = SmallRng::from_rng(rng)?; + Ok(Self { + rng, + discover, + services: ReadyCache::default(), + ready_index: None, + + _req: PhantomData, + }) } /// Returns the number of endpoints currently tracked by the balancer. diff --git a/tower/src/balance/p2c/test.rs b/tower/src/balance/p2c/test.rs index 437b912..404e94f 100644 --- a/tower/src/balance/p2c/test.rs +++ b/tower/src/balance/p2c/test.rs @@ -11,7 +11,7 @@ use super::*; async fn empty() { let empty: Vec, usize>> = vec![]; let disco = ServiceList::new(empty); - let mut svc = mock::Spawn::new(Balance::from_entropy(disco)); + let mut svc = mock::Spawn::new(Balance::new(disco)); assert_pending!(svc.poll_ready()); } @@ -20,7 +20,7 @@ async fn single_endpoint() { let (mut svc, mut handle) = mock::spawn_with(|s| { let mock = load::Constant::new(s, 0); let disco = ServiceList::new(vec![mock].into_iter()); - Balance::from_entropy(disco) + Balance::new(disco) }); handle.allow(0); @@ -61,7 +61,7 @@ async fn two_endpoints_with_equal_load() { pin_mut!(handle_b); let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); - let mut svc = mock::Spawn::new(Balance::from_entropy(disco)); + let mut svc = mock::Spawn::new(Balance::new(disco)); handle_a.allow(0); handle_b.allow(0); diff --git a/tower/src/balance/pool/mod.rs b/tower/src/balance/pool/mod.rs index 07df300..799aaf8 100644 --- a/tower/src/balance/pool/mod.rs +++ b/tower/src/balance/pool/mod.rs @@ -289,7 +289,7 @@ impl Builder { }; Pool { - balance: Balance::from_entropy(Box::pin(d)), + balance: Balance::new(Box::pin(d)), options: *self, ewma: self.init, } diff --git a/tower/tests/balance/main.rs b/tower/tests/balance/main.rs index c23e074..8ca3ce9 100644 --- a/tower/tests/balance/main.rs +++ b/tower/tests/balance/main.rs @@ -34,7 +34,7 @@ impl tower::load::Load for Mock { fn stress() { let mut task = task::spawn(()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); - let mut cache = Balance::<_, Req>::from_entropy(rx); + let mut cache = Balance::<_, Req>::new(rx); let mut nready = 0; let mut services = slab::Slab::<(mock::Handle, bool)>::new();