Support weighted balancing (#221)

In order to implement red-line testing, blue-green deployments, and
other operational use cases, many service discovery and routing schemes
support endpoint weighting.

In this iteration, we provide a decorator type, `WeightedLoad`, that may
be used to wrap load-bearing services to alter their load according to a
weight.  The `WithWeighted` type may also be used to wrap `Discover`
implementations, in which case it will wrap all new services with
`WeightedLoad`.
This commit is contained in:
Oliver Gould 2019-04-03 19:59:46 -07:00 committed by GitHub
parent 298fe2cc12
commit aa8d024fc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 324 additions and 56 deletions

View File

@ -27,21 +27,28 @@ use tower_service::Service;
use tower_util::ServiceFn;
const REQUESTS: usize = 50_000;
const CONCURRENCY: usize = 50;
const CONCURRENCY: usize = 500;
const DEFAULT_RTT: Duration = Duration::from_millis(30);
static ENDPOINT_CAPACITY: usize = CONCURRENCY;
static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
Duration::from_millis(1),
Duration::from_millis(10),
Duration::from_millis(5),
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(100),
Duration::from_millis(100),
Duration::from_millis(100),
Duration::from_millis(100),
Duration::from_millis(500),
Duration::from_millis(1000),
];
static WEIGHTS: [f64; 10] = [1.0, 1.0, 1.0, 0.5, 1.5, 0.5, 1.5, 1.0, 1.0, 1.0];
struct Summary {
latencies: Histogram<u64>,
start: Instant,
count_by_instance: [usize; 10],
}
fn main() {
env_logger::init();
@ -55,10 +62,40 @@ fn main() {
print!("{}ms, ", l);
}
println!("]");
print!("WEIGHTS=[");
for w in &WEIGHTS {
print!("{}, ", w);
}
println!("]");
let mut rt = runtime::Runtime::new().unwrap();
// Show weighted behavior first...
let fut = future::lazy(move || {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::Balance::p2c(lb::WithWeighted::from(lb::load::WithPeakEwma::new(
d,
DEFAULT_RTT,
decay,
lb::load::NoInstrument,
)));
run("P2C+PeakEWMA w/ weights", pe)
});
let fut = fut.then(move |_| {
let d = gen_disco();
let ll = lb::Balance::p2c(lb::WithWeighted::from(lb::load::WithPendingRequests::new(
d,
lb::load::NoInstrument,
)));
run("P2C+LeastLoaded w/ weights", ll)
});
// Then run through standard comparisons...
let fut = fut.then(move |_| {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(
@ -70,7 +107,7 @@ fn main() {
run("P2C+PeakEWMA", pe)
});
let fut = fut.and_then(move |_| {
let fut = fut.then(move |_| {
let d = gen_disco();
let ll = lb::Balance::p2c(lb::load::WithPendingRequests::new(
d,
@ -93,24 +130,32 @@ type Error = Box<::std::error::Error + Send + Sync>;
fn gen_disco() -> impl Discover<
Key = usize,
Error = impl Into<Error>,
Service = impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
Service = lb::Weighted<
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
>,
> + Send {
tower_discover::ServiceList::new(MAX_ENDPOINT_LATENCIES.iter().map(|latency| {
let svc = ServiceFn::new(move |_| {
let start = Instant::now();
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
let delay = timer::Delay::new(start + latency);
let svcs = MAX_ENDPOINT_LATENCIES
.iter()
.zip(WEIGHTS.iter())
.enumerate()
.map(|(instance, (latency, weight))| {
let svc = ServiceFn::new(move |_| {
let start = Instant::now();
delay.map(move |_| {
let latency = Instant::now() - start;
Rsp { latency }
})
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
timer::Delay::new(start + latency).map(move |_| {
let latency = start.elapsed();
Rsp { latency, instance }
})
});
let svc = InFlightLimit::new(svc, ENDPOINT_CAPACITY);
lb::Weighted::new(svc, *weight)
});
InFlightLimit::new(svc, ENDPOINT_CAPACITY)
}))
tower_discover::ServiceList::new(svcs)
}
fn run<D, C>(name: &'static str, lb: lb::Balance<D, C>) -> impl Future<Item = (), Error = ()>
@ -123,59 +168,78 @@ where
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
println!("{}", name);
let t0 = Instant::now();
let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64);
let service = InFlightLimit::new(lb, CONCURRENCY);
let responses = service.call_all(requests).unordered();
compute_histo(responses)
.map(move |h| report(&h, t0.elapsed()))
.map_err(|_| {})
compute_histo(responses).map(|s| s.report()).map_err(|_| {})
}
fn compute_histo<S>(times: S) -> impl Future<Item = Histogram<u64>, Error = Error> + 'static
fn compute_histo<S>(times: S) -> impl Future<Item = Summary, Error = Error> + 'static
where
S: Stream<Item = Rsp, Error = Error> + 'static,
{
// The max delay is 2000ms. At 3 significant figures.
let histo = Histogram::<u64>::new_with_max(3_000, 3).unwrap();
times.fold(histo, |mut histo, Rsp { latency }| {
let ms = latency.as_secs() * 1_000;
let ms = ms + u64::from(latency.subsec_nanos()) / 1_000 / 1_000;
histo += ms;
future::ok::<_, Error>(histo)
times.fold(Summary::new(), |mut summary, rsp| {
summary.count(rsp);
Ok(summary) as Result<_, Error>
})
}
fn report(histo: &Histogram<u64>, elapsed: Duration) {
println!(" wall {:4}s", elapsed.as_secs());
if histo.len() < 2 {
return;
impl Summary {
fn new() -> Self {
Self {
// The max delay is 2000ms. At 3 significant figures.
latencies: Histogram::<u64>::new_with_max(3_000, 3).unwrap(),
start: Instant::now(),
count_by_instance: [0; 10],
}
}
println!(" p50 {:4}ms", histo.value_at_quantile(0.5));
if histo.len() < 10 {
return;
fn count(&mut self, rsp: Rsp) {
let ms = rsp.latency.as_secs() * 1_000;
let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000;
self.latencies += ms;
self.count_by_instance[rsp.instance] += 1;
}
println!(" p90 {:4}ms", histo.value_at_quantile(0.9));
if histo.len() < 50 {
return;
}
println!(" p95 {:4}ms", histo.value_at_quantile(0.95));
fn report(&self) {
let mut total = 0;
for c in &self.count_by_instance {
total += c;
}
for (i, c) in self.count_by_instance.into_iter().enumerate() {
let p = *c as f64 / total as f64 * 100.0;
println!(" [{:02}] {:>5.01}%", i, p);
}
if histo.len() < 100 {
return;
}
println!(" p99 {:4}ms", histo.value_at_quantile(0.99));
println!(" wall {:4}s", self.start.elapsed().as_secs());
if histo.len() < 1000 {
return;
if self.latencies.len() < 2 {
return;
}
println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5));
if self.latencies.len() < 10 {
return;
}
println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9));
if self.latencies.len() < 50 {
return;
}
println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95));
if self.latencies.len() < 100 {
return;
}
println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99));
if self.latencies.len() < 1000 {
return;
}
println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999));
}
println!(" p999 {:4}ms", histo.value_at_quantile(0.999));
}
#[derive(Debug, Clone)]
@ -184,4 +248,5 @@ struct Req;
#[derive(Debug)]
struct Rsp {
latency: Duration,
instance: usize,
}

View File

@ -29,6 +29,7 @@ pub mod pool;
mod test;
pub use self::choose::Choose;
pub use self::load::weight::{HasWeight, Weight, Weighted, WithWeighted};
pub use self::load::Load;
pub use self::pool::Pool;

View File

@ -18,7 +18,7 @@ impl<T, M: Copy> Constant<T, M> {
}
}
impl<T, M: Copy> Load for Constant<T, M> {
impl<T, M: Copy + PartialOrd> Load for Constant<T, M> {
type Metric = M;
fn load(&self) -> M {

View File

@ -2,6 +2,7 @@ mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;
pub(crate) mod weight;
pub use self::constant::Constant;
pub use self::instrument::{Instrument, InstrumentFuture, NoInstrument};
@ -13,7 +14,7 @@ pub use self::pending_requests::{PendingRequests, WithPendingRequests};
/// Implementors should choose load values so that lesser-loaded instances return lesser
/// values than higher-load instances.
pub trait Load {
type Metric;
type Metric: PartialOrd;
fn load(&self) -> Self::Metric;
}

View File

@ -1,4 +1,5 @@
use futures::{Async, Poll};
use std::ops;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio_timer::clock;
@ -7,7 +8,7 @@ use tower_service::Service;
use super::{Instrument, InstrumentFuture, NoInstrument};
use Load;
use {HasWeight, Load, Weight};
/// Wraps an `S`-typed Service with Peak-EWMA load measurement.
///
@ -189,6 +190,12 @@ impl<S, I> Load for PeakEwma<S, I> {
}
}
impl<S: HasWeight, I> HasWeight for PeakEwma<S, I> {
fn weight(&self) -> Weight {
self.service.weight()
}
}
impl<S, I> PeakEwma<S, I> {
fn update_estimate(&self) -> f64 {
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
@ -277,6 +284,16 @@ impl Drop for Handle {
}
}
// ===== impl Cost =====
impl ops::Div<Weight> for Cost {
type Output = f64;
fn div(self, w: Weight) -> f64 {
self.0 / w
}
}
// Utility that converts durations to nanos in f64.
//
// Due to a lossy transformation, the maximum value that can be represented is ~585 years,

View File

@ -1,10 +1,11 @@
use futures::{Async, Poll};
use std::ops;
use std::sync::Arc;
use tower_discover::{Change, Discover};
use tower_service::Service;
use super::{Instrument, InstrumentFuture, NoInstrument};
use Load;
use {HasWeight, Load, Weight};
/// Expresses load based on the number of currently-pending requests.
#[derive(Debug)]
@ -33,6 +34,16 @@ pub struct Count(usize);
#[derive(Debug)]
pub struct Handle(RefCount);
// ===== impl Count =====
impl ops::Div<Weight> for Count {
type Output = f64;
fn div(self, weight: Weight) -> f64 {
self.0 / weight
}
}
// ===== impl PendingRequests =====
impl<S, I> PendingRequests<S, I> {
@ -58,6 +69,12 @@ impl<S, I> Load for PendingRequests<S, I> {
}
}
impl<S: HasWeight, I> HasWeight for PendingRequests<S, I> {
fn weight(&self) -> Weight {
self.service.weight()
}
}
impl<S, I, Request> Service<Request> for PendingRequests<S, I>
where
S: Service<Request>,

View File

@ -0,0 +1,167 @@
use futures::{Async, Poll};
use std::ops;
use tower_discover::{Change, Discover};
use tower_service::Service;
use Load;
/// A weight on [0.0, ∞].
///
/// Lesser-weighted nodes receive less traffic than heavier-weighted nodes.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Weight(f64);
/// A Service, that implements Load, that
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Weighted<T> {
inner: T,
weight: Weight,
}
#[derive(Debug)]
pub struct WithWeighted<T>(T);
pub trait HasWeight {
fn weight(&self) -> Weight;
}
// === impl Weighted ===
impl<T: HasWeight> From<T> for Weighted<T> {
fn from(inner: T) -> Self {
let weight = inner.weight();
Self { inner, weight }
}
}
impl<T> HasWeight for Weighted<T> {
fn weight(&self) -> Weight {
self.weight
}
}
impl<T> Weighted<T> {
pub fn new<W: Into<Weight>>(inner: T, w: W) -> Self {
let weight = w.into();
Self { inner, weight }
}
pub fn into_parts(self) -> (T, Weight) {
let Self { inner, weight } = self;
(inner, weight)
}
}
impl<L> Load for Weighted<L>
where
L: Load,
L::Metric: ops::Div<Weight>,
<L::Metric as ops::Div<Weight>>::Output: PartialOrd,
{
type Metric = <L::Metric as ops::Div<Weight>>::Output;
fn load(&self) -> Self::Metric {
self.inner.load() / self.weight
}
}
impl<R, S: Service<R>> Service<R> for Weighted<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, req: R) -> Self::Future {
self.inner.call(req)
}
}
// === impl withWeight ===
impl<D> From<D> for WithWeighted<D>
where
D: Discover,
D::Service: HasWeight,
{
fn from(d: D) -> Self {
WithWeighted(d)
}
}
impl<D> Discover for WithWeighted<D>
where
D: Discover,
D::Service: HasWeight,
{
type Key = D::Key;
type Error = D::Error;
type Service = Weighted<D::Service>;
fn poll(&mut self) -> Poll<Change<D::Key, Self::Service>, Self::Error> {
let c = match try_ready!(self.0.poll()) {
Change::Insert(k, svc) => Change::Insert(k, Weighted::from(svc)),
Change::Remove(k) => Change::Remove(k),
};
Ok(Async::Ready(c))
}
}
// === impl Weight ===
impl Weight {
pub const MIN: Weight = Weight(0.0);
pub const DEFAULT: Weight = Weight(1.0);
}
impl Default for Weight {
fn default() -> Self {
Weight::DEFAULT
}
}
impl From<f64> for Weight {
fn from(w: f64) -> Self {
if w < 0.0 {
Weight::MIN
} else {
Weight(w)
}
}
}
impl Into<f64> for Weight {
fn into(self) -> f64 {
self.0
}
}
impl ops::Div<Weight> for f64 {
type Output = f64;
fn div(self, Weight(w): Weight) -> f64 {
if w == 0.0 {
::std::f64::INFINITY
} else {
self / w
}
}
}
impl ops::Div<Weight> for usize {
type Output = f64;
fn div(self, w: Weight) -> f64 {
(self as f64) / w
}
}
#[test]
fn div_min() {
assert_eq!(10.0 / Weight::MIN, ::std::f64::INFINITY);
assert_eq!(10 / Weight::MIN, ::std::f64::INFINITY);
assert_eq!(0 / Weight::MIN, ::std::f64::INFINITY);
}