balance: Configure weights from keys, not services (#281)

* balance: Configure weights from keys, not services

The initial weighted balancing implementation required that the
underlying service implement `HasWeight`.

Practically, this doesn't work that well, since this may force
middlewares to implement this trait as well.

To fix this, we change the type bounds so that _keys_, not services,
must implement `HasWeight`.

This has a drawback, though, in that Weight, which contains a float,
cannot implement `Hash` or `Eq`, which is required by the balancer. This
tradeoff seems manageable, though (and is already addressed in linkerd,
for instance). We should follow-up with a change to alter the internal
representation of Weight to alleviate this.
This commit is contained in:
Oliver Gould 2019-05-09 12:17:43 -07:00 committed by GitHub
parent b9c2fea0fc
commit 9b27863a61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 28 deletions

View File

@ -1,12 +1,17 @@
//! Exercises load balancers with mocked services.
use env_logger;
use futures::{future, stream, Future, Stream};
use futures::{future, stream, Async, Future, Poll, Stream};
use hdrsample::Histogram;
use rand::{self, Rng};
use std::time::{Duration, Instant};
use std::{cmp, hash};
use tokio::{runtime, timer};
use tower::{discover::Discover, limit::concurrency::ConcurrencyLimit, Service, ServiceExt};
use tower::{
discover::{Change, Discover},
limit::concurrency::ConcurrencyLimit,
Service, ServiceExt,
};
use tower_balance as lb;
const REQUESTS: usize = 50_000;
@ -110,35 +115,81 @@ fn main() {
type Error = Box<dyn std::error::Error + Send + Sync>;
#[derive(Clone, Debug, PartialEq)]
struct Key {
instance: usize,
weight: lb::Weight,
}
impl cmp::Eq for Key {}
impl hash::Hash for Key {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.instance.hash(state);
// Ignore weight.
}
}
impl lb::HasWeight for Key {
fn weight(&self) -> lb::Weight {
self.weight
}
}
struct Disco<S>(Vec<(Key, S)>);
impl<S> Discover for Disco<S>
where
S: Service<Req, Response = Rsp, Error = Error>,
{
type Key = Key;
type Service = S;
type Error = Error;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.0.pop() {
Some((k, service)) => Ok(Change::Insert(k, service).into()),
None => Ok(Async::NotReady),
}
}
}
fn gen_disco() -> impl Discover<
Key = usize,
Error = impl Into<Error>,
Service = lb::Weighted<
Key = Key,
Error = Error,
Service = ConcurrencyLimit<
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
>,
> + Send {
let svcs = MAX_ENDPOINT_LATENCIES
.iter()
.zip(WEIGHTS.iter())
.enumerate()
.map(|(instance, (latency, weight))| {
let svc = tower::service_fn(move |_| {
let start = Instant::now();
Disco(
MAX_ENDPOINT_LATENCIES
.iter()
.zip(WEIGHTS.iter())
.enumerate()
.map(|(instance, (latency, weight))| {
let key = Key {
instance,
weight: (*weight).into(),
};
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
let svc = tower::service_fn(move |_| {
let start = Instant::now();
timer::Delay::new(start + latency).map(move |_| {
let latency = start.elapsed();
Rsp { latency, instance }
})
});
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
let svc = ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY);
lb::Weighted::new(svc, *weight)
});
tower_discover::ServiceList::new(svcs)
timer::Delay::new(start + latency)
.map_err(Into::into)
.map(move |_| {
let latency = start.elapsed();
Rsp { latency, instance }
})
});
(key, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
})
.collect(),
)
}
fn run<D, C>(name: &'static str, lb: lb::Balance<D, C>) -> impl Future<Item = (), Error = ()>

View File

@ -79,12 +79,12 @@ impl<R, S: Service<R>> Service<R> for Weighted<S> {
}
}
// === impl withWeight ===
// === impl WithWeighted ===
impl<D> From<D> for WithWeighted<D>
where
D: Discover,
D::Service: HasWeight,
D::Key: HasWeight,
{
fn from(d: D) -> Self {
WithWeighted(d)
@ -94,7 +94,7 @@ where
impl<D> Discover for WithWeighted<D>
where
D: Discover,
D::Service: HasWeight,
D::Key: HasWeight,
{
type Key = D::Key;
type Error = D::Error;
@ -102,8 +102,11 @@ where
fn poll(&mut self) -> Poll<Change<D::Key, Self::Service>, Self::Error> {
let c = match try_ready!(self.0.poll()) {
Change::Insert(k, svc) => Change::Insert(k, Weighted::from(svc)),
Change::Remove(k) => Change::Remove(k),
Change::Insert(k, svc) => {
let w = k.weight();
Change::Insert(k, Weighted::new(svc, w))
}
};
Ok(Async::Ready(c))