Extract tower-load from tower-balance (#285)

The tower-balance crate includes the `Load` and `Instrument` traits,
which are likely useful outside of balancers; and certainly have no
tight coupling with any specific balancer implementation. This change
extracts these protocol-agnostic traits into a dedicated crate.

The `Load` trait includes a latency-aware _PeakEWMA_ load strategy as
well as a simple _PendingRequests_ strategy for latency-agnostic
applications.

The `Instrument` trait is used by both of these strategies to track
in-flight requests without knowing protocol details. It is expected that
protocol-specific crates will provide, for instance, HTTP
time-to-first-byte latency strategies.

A default `NoInstrument` implementation tracks the a request until its
response future is satisfied.

This crate should only be published once tower-balance is published.

Part of https://github.com/tower-rs/tower/issues/286
This commit is contained in:
Oliver Gould 2019-05-29 10:32:02 -07:00 committed by GitHub
parent 42f4b7781e
commit a496fbf72c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 147 additions and 325 deletions

View File

@ -9,6 +9,7 @@ members = [
"tower-hedge",
"tower-layer",
"tower-limit",
"tower-load",
"tower-load-shed",
"tower-reconnect",
"tower-retry",

View File

@ -20,6 +20,7 @@ jobs:
- tower-hedge
- tower-layer
- tower-limit
- tower-load
- tower-load-shed
- tower-reconnect
- tower-retry

View File

@ -28,8 +28,9 @@ indexmap = "1.0.2"
log = "0.4.1"
rand = "0.6.5"
tokio-timer = "0.2.4"
tower-service = "0.2.0"
tower-discover = "0.1.0"
tower-load = { version = "0.1.0", path = "../tower-load" }
tower-service = "0.2.0"
tower-util = "0.1.0"
[dev-dependencies]

View File

@ -5,7 +5,6 @@ use futures::{future, stream, Async, Future, Poll, Stream};
use hdrsample::Histogram;
use rand::{self, Rng};
use std::time::{Duration, Instant};
use std::{cmp, hash};
use tokio::{runtime, timer};
use tower::{
discover::{Change, Discover},
@ -13,6 +12,7 @@ use tower::{
Service, ServiceExt,
};
use tower_balance as lb;
use tower_load as load;
const REQUESTS: usize = 50_000;
const CONCURRENCY: usize = 500;
@ -30,7 +30,6 @@ static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
Duration::from_millis(500),
Duration::from_millis(1000),
];
static WEIGHTS: [f64; 10] = [1.0, 1.0, 1.0, 0.5, 1.5, 0.5, 1.5, 1.0, 1.0, 1.0];
struct Summary {
latencies: Histogram<u64>,
@ -50,57 +49,24 @@ fn main() {
print!("{}ms, ", l);
}
println!("]");
print!("WEIGHTS=[");
for w in &WEIGHTS {
print!("{}, ", w);
}
println!("]");
let mut rt = runtime::Runtime::new().unwrap();
// Show weighted behavior first...
let fut = future::lazy(move || {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::Balance::p2c(lb::WithWeighted::from(lb::load::WithPeakEwma::new(
let pe = lb::Balance::p2c(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
lb::load::NoInstrument,
)));
run("P2C+PeakEWMA w/ weights", pe)
});
let fut = fut.then(move |_| {
let d = gen_disco();
let ll = lb::Balance::p2c(lb::WithWeighted::from(lb::load::WithPendingRequests::new(
d,
lb::load::NoInstrument,
)));
run("P2C+LeastLoaded w/ weights", ll)
});
// Then run through standard comparisons...
let fut = fut.then(move |_| {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(
d,
DEFAULT_RTT,
decay,
lb::load::NoInstrument,
load::NoInstrument,
));
run("P2C+PeakEWMA", pe)
});
let fut = fut.then(move |_| {
let d = gen_disco();
let ll = lb::Balance::p2c(lb::load::WithPendingRequests::new(
d,
lb::load::NoInstrument,
));
let ll = lb::Balance::p2c(load::PendingRequestsDiscover::new(d, load::NoInstrument));
run("P2C+LeastLoaded", ll)
});
@ -115,26 +81,7 @@ fn main() {
type Error = Box<dyn std::error::Error + Send + Sync>;
#[derive(Clone, Debug, PartialEq)]
struct Key {
instance: usize,
weight: lb::Weight,
}
impl cmp::Eq for Key {}
impl hash::Hash for Key {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.instance.hash(state);
// Ignore weight.
}
}
impl lb::HasWeight for Key {
fn weight(&self) -> lb::Weight {
self.weight
}
}
type Key = usize;
struct Disco<S>(Vec<(Key, S)>);
@ -163,14 +110,8 @@ fn gen_disco() -> impl Discover<
Disco(
MAX_ENDPOINT_LATENCIES
.iter()
.zip(WEIGHTS.iter())
.enumerate()
.map(|(instance, (latency, weight))| {
let key = Key {
instance,
weight: (*weight).into(),
};
.map(|(instance, latency)| {
let svc = tower::service_fn(move |_| {
let start = Instant::now();
@ -186,7 +127,7 @@ fn gen_disco() -> impl Discover<
})
});
(key, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
})
.collect(),
)

View File

@ -10,25 +10,18 @@ use log::{debug, trace};
use rand::{rngs::SmallRng, SeedableRng};
use std::fmt;
use tower_discover::Discover;
use tower_load::Load;
use tower_service::Service;
pub mod choose;
pub mod error;
pub mod future;
pub mod load;
pub mod pool;
#[cfg(test)]
mod test;
pub use self::{
choose::Choose,
load::{
weight::{HasWeight, Weight, Weighted, WithWeighted},
Load,
},
pool::Pool,
};
pub use self::{choose::Choose, pool::Pool};
use self::{error::Error, future::ResponseFuture};

View File

@ -1,22 +0,0 @@
mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;
pub(crate) mod weight;
pub use self::{
constant::Constant,
instrument::{Instrument, InstrumentFuture, NoInstrument},
peak_ewma::{PeakEwma, WithPeakEwma},
pending_requests::{PendingRequests, WithPendingRequests},
};
/// Exposes a load metric.
///
/// Implementors should choose load values so that lesser-loaded instances return lesser
/// values than higher-load instances.
pub trait Load {
type Metric: PartialOrd;
fn load(&self) -> Self::Metric;
}

View File

@ -1,170 +0,0 @@
use futures::{try_ready, Async, Poll};
use std::ops;
use tower_discover::{Change, Discover};
use tower_service::Service;
use crate::Load;
/// A weight on [0.0, ∞].
///
/// Lesser-weighted nodes receive less traffic than heavier-weighted nodes.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Weight(f64);
/// A Service, that implements Load, that
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Weighted<T> {
inner: T,
weight: Weight,
}
#[derive(Debug)]
pub struct WithWeighted<T>(T);
pub trait HasWeight {
fn weight(&self) -> Weight;
}
// === impl Weighted ===
impl<T: HasWeight> From<T> for Weighted<T> {
fn from(inner: T) -> Self {
let weight = inner.weight();
Self { inner, weight }
}
}
impl<T> HasWeight for Weighted<T> {
fn weight(&self) -> Weight {
self.weight
}
}
impl<T> Weighted<T> {
pub fn new<W: Into<Weight>>(inner: T, w: W) -> Self {
let weight = w.into();
Self { inner, weight }
}
pub fn into_parts(self) -> (T, Weight) {
let Self { inner, weight } = self;
(inner, weight)
}
}
impl<L> Load for Weighted<L>
where
L: Load,
L::Metric: ops::Div<Weight>,
<L::Metric as ops::Div<Weight>>::Output: PartialOrd,
{
type Metric = <L::Metric as ops::Div<Weight>>::Output;
fn load(&self) -> Self::Metric {
self.inner.load() / self.weight
}
}
impl<R, S: Service<R>> Service<R> for Weighted<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, req: R) -> Self::Future {
self.inner.call(req)
}
}
// === impl WithWeighted ===
impl<D> From<D> for WithWeighted<D>
where
D: Discover,
D::Key: HasWeight,
{
fn from(d: D) -> Self {
WithWeighted(d)
}
}
impl<D> Discover for WithWeighted<D>
where
D: Discover,
D::Key: HasWeight,
{
type Key = D::Key;
type Error = D::Error;
type Service = Weighted<D::Service>;
fn poll(&mut self) -> Poll<Change<D::Key, Self::Service>, Self::Error> {
let c = match try_ready!(self.0.poll()) {
Change::Remove(k) => Change::Remove(k),
Change::Insert(k, svc) => {
let w = k.weight();
Change::Insert(k, Weighted::new(svc, w))
}
};
Ok(Async::Ready(c))
}
}
// === impl Weight ===
impl Weight {
pub const MIN: Weight = Weight(0.0);
pub const DEFAULT: Weight = Weight(1.0);
}
impl Default for Weight {
fn default() -> Self {
Weight::DEFAULT
}
}
impl From<f64> for Weight {
fn from(w: f64) -> Self {
if w < 0.0 {
Weight::MIN
} else {
Weight(w)
}
}
}
impl Into<f64> for Weight {
fn into(self) -> f64 {
self.0
}
}
impl ops::Div<Weight> for f64 {
type Output = f64;
fn div(self, Weight(w): Weight) -> f64 {
if w == 0.0 {
::std::f64::INFINITY
} else {
self / w
}
}
}
impl ops::Div<Weight> for usize {
type Output = f64;
fn div(self, w: Weight) -> f64 {
(self as f64) / w
}
}
#[test]
fn div_min() {
assert_eq!(10.0 / Weight::MIN, ::std::f64::INFINITY);
assert_eq!(10 / Weight::MIN, ::std::f64::INFINITY);
assert_eq!(0 / Weight::MIN, ::std::f64::INFINITY);
}

3
tower-load/CHANGELOG.md Normal file
View File

@ -0,0 +1,3 @@
# 0.1.0 (unreleased)
- Initial release

33
tower-load/Cargo.toml Normal file
View File

@ -0,0 +1,33 @@
[package]
name = "tower-load"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-load/0.1.0"
description = """
Strategies for measuring the load of a service
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
publish = false
[dependencies]
futures = "0.1.26"
log = "0.4.1"
tokio-timer = "0.2.4"
tower-service = "0.2.0"
tower-discover = "0.1.0"
[dev-dependencies]
tokio-executor = "0.1.2"

25
tower-load/LICENSE Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

13
tower-load/README.md Normal file
View File

@ -0,0 +1,13 @@
# Tower Load
Provides strategies for measuring a service's load.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,3 +1,5 @@
//! A constant `Load` implementation. Primarily useful for testing.
use futures::{try_ready, Async, Poll};
use tower_discover::{Change, Discover};
use tower_service::Service;
@ -13,6 +15,7 @@ pub struct Constant<T, M> {
// ===== impl Constant =====
impl<T, M: Copy> Constant<T, M> {
/// Wraps a `T`-typed service with a constant `M`-typed load metric.
pub fn new(inner: T, load: M) -> Self {
Self { inner, load }
}

View File

@ -15,10 +15,11 @@ use futures::{try_ready, Future, Poll};
/// multiple messages.
///
/// In many cases, the `Output` type is simply `V`. However, `Instrument` may alter the
/// typein order to instrument it appropriately. For example, an HTTP Instrument may
/// type in order to instrument it appropriately. For example, an HTTP Instrument may
/// modify the body type: so an `Instrument` that takes values of type `http::Response<A>`
/// may output values of type `http::Response<B>`.
pub trait Instrument<H, V>: Clone {
/// The instrumented value type.
type Output;
/// Attaches an `H`-typed handle to a `V`-typed value.
@ -48,6 +49,7 @@ where
F: Future,
I: Instrument<H, F::Item>,
{
/// Wraps a future, instrumenting its value if successful.
pub fn new(instrument: I, handle: H, future: F) -> Self {
InstrumentFuture {
future,

28
tower-load/src/lib.rs Normal file
View File

@ -0,0 +1,28 @@
//! Abstractions and utilties for measuring a service's load.
#![doc(html_root_url = "https://docs.rs/tower-load/0.1.0")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![deny(warnings)]
#![allow(elided_lifetimes_in_paths)]
mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;
pub use self::{
constant::Constant,
instrument::{Instrument, InstrumentFuture, NoInstrument},
peak_ewma::{PeakEwma, PeakEwmaDiscover},
pending_requests::{PendingRequests, PendingRequestsDiscover},
};
/// Exposes a load metric.
pub trait Load {
/// A comparable load metric. Lesser values are "preferable" to greater values.
type Metric: PartialOrd;
/// Obtains a service's load.
fn load(&self) -> Self::Metric;
}

View File

@ -1,7 +1,10 @@
//! A `Load` implementation that PeakEWMA on response latency.
use super::{Instrument, InstrumentFuture, NoInstrument};
use crate::Load;
use futures::{try_ready, Async, Poll};
use log::trace;
use std::{
ops,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
@ -9,10 +12,6 @@ use tokio_timer::clock;
use tower_discover::{Change, Discover};
use tower_service::Service;
use super::{Instrument, InstrumentFuture, NoInstrument};
use crate::{HasWeight, Load, Weight};
/// Wraps an `S`-typed Service with Peak-EWMA load measurement.
///
/// `PeakEwma` implements `Load` with the `Cost` metric that estimates the amount of
@ -46,7 +45,7 @@ pub struct PeakEwma<S, I = NoInstrument> {
}
/// Wraps a `D`-typed stream of discovery updates with `PeakEwma`.
pub struct WithPeakEwma<D, I = NoInstrument> {
pub struct PeakEwmaDiscover<D, I = NoInstrument> {
discover: D,
decay_ns: f64,
default_rtt: Duration,
@ -60,14 +59,14 @@ pub struct WithPeakEwma<D, I = NoInstrument> {
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Cost(f64);
/// Updates `RttEstimate` when dropped.
/// Tracks an in-flight request and updates the RTT-estimate on Drop.
pub struct Handle {
sent_at: Instant,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
}
/// Holds the current RTT estimateand the last time this value was updated.
/// Holds the current RTT estimate and the last time this value was updated.
struct RttEstimate {
update_at: Instant,
rtt_ns: f64,
@ -77,7 +76,7 @@ const NANOS_PER_MILLI: f64 = 1_000_000.0;
// ===== impl PeakEwma =====
impl<D, I> WithPeakEwma<D, I> {
impl<D, I> PeakEwmaDiscover<D, I> {
/// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
@ -91,7 +90,7 @@ impl<D, I> WithPeakEwma<D, I> {
D::Service: Service<Request>,
I: Instrument<Handle, <D::Service as Service<Request>>::Response>,
{
WithPeakEwma {
PeakEwmaDiscover {
discover,
decay_ns: nanos(decay),
default_rtt,
@ -100,7 +99,7 @@ impl<D, I> WithPeakEwma<D, I> {
}
}
impl<D, I> Discover for WithPeakEwma<D, I>
impl<D, I> Discover for PeakEwmaDiscover<D, I>
where
D: Discover,
I: Clone,
@ -110,19 +109,17 @@ where
type Error = D::Error;
fn poll(&mut self) -> Poll<Change<D::Key, Self::Service>, D::Error> {
use self::Change::*;
let change = match try_ready!(self.discover.poll()) {
Insert(k, svc) => {
let s = PeakEwma::new(
Change::Remove(k) => Change::Remove(k),
Change::Insert(k, svc) => {
let peak_ewma = PeakEwma::new(
svc,
self.default_rtt,
self.decay_ns,
self.instrument.clone(),
);
Insert(k, s)
Change::Insert(k, peak_ewma)
}
Remove(k) => Remove(k),
};
Ok(Async::Ready(change))
@ -193,12 +190,6 @@ impl<S, I> Load for PeakEwma<S, I> {
}
}
impl<S: HasWeight, I> HasWeight for PeakEwma<S, I> {
fn weight(&self) -> Weight {
self.service.weight()
}
}
impl<S, I> PeakEwma<S, I> {
fn update_estimate(&self) -> f64 {
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
@ -289,14 +280,6 @@ impl Drop for Handle {
// ===== impl Cost =====
impl ops::Div<Weight> for Cost {
type Output = f64;
fn div(self, w: Weight) -> f64 {
self.0 / w
}
}
// Utility that converts durations to nanos in f64.
//
// Due to a lossy transformation, the maximum value that can be represented is ~585 years,

View File

@ -1,10 +1,11 @@
use futures::{try_ready, Async, Poll};
use std::{ops, sync::Arc};
use tower_discover::{Change, Discover};
use tower_service::Service;
//! A `Load` implementation that uses the count of in-flight requests.
use super::{Instrument, InstrumentFuture, NoInstrument};
use crate::{HasWeight, Load, Weight};
use crate::Load;
use futures::{try_ready, Async, Poll};
use std::sync::Arc;
use tower_discover::{Change, Discover};
use tower_service::Service;
/// Expresses load based on the number of currently-pending requests.
#[derive(Debug)]
@ -21,7 +22,7 @@ struct RefCount(Arc<()>);
/// Wraps `inner`'s services with `PendingRequests`.
#[derive(Debug)]
pub struct WithPendingRequests<D, I = NoInstrument> {
pub struct PendingRequestsDiscover<D, I = NoInstrument> {
discover: D,
instrument: I,
}
@ -30,19 +31,10 @@ pub struct WithPendingRequests<D, I = NoInstrument> {
#[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)]
pub struct Count(usize);
/// Tracks an in-flight request by reference count.
#[derive(Debug)]
pub struct Handle(RefCount);
// ===== impl Count =====
impl ops::Div<Weight> for Count {
type Output = f64;
fn div(self, weight: Weight) -> f64 {
self.0 / weight
}
}
// ===== impl PendingRequests =====
impl<S, I> PendingRequests<S, I> {
@ -68,12 +60,6 @@ impl<S, I> Load for PendingRequests<S, I> {
}
}
impl<S: HasWeight, I> HasWeight for PendingRequests<S, I> {
fn weight(&self) -> Weight {
self.service.weight()
}
}
impl<S, I, Request> Service<Request> for PendingRequests<S, I>
where
S: Service<Request>,
@ -96,9 +82,10 @@ where
}
}
// ===== impl WithPendingRequests =====
// ===== impl PendingRequestsDiscover =====
impl<D, I> WithPendingRequests<D, I> {
impl<D, I> PendingRequestsDiscover<D, I> {
/// Wraps a `Discover``, wrapping all of its services with `PendingRequests`.
pub fn new<Request>(discover: D, instrument: I) -> Self
where
D: Discover,
@ -112,7 +99,7 @@ impl<D, I> WithPendingRequests<D, I> {
}
}
impl<D, I> Discover for WithPendingRequests<D, I>
impl<D, I> Discover for PendingRequestsDiscover<D, I>
where
D: Discover,
I: Clone,