balance: Update demo (#79)

In preparation for additional load balancing strategies, the demo is
being updated to allow for richer testing in several important ways:

- Adopt the new `tokio` multithreaded runtime.

- Use `tower-buffer` to drive each simulated endpoint on an independent
  task. This fixes a bug where requests appeared active longer than
  intended (while waiting for the SendRequests task process responses).

- A top-level concurrency has been added (by wrapping the balancer in
  `tower-in-flight-limit`) so that `REQUESTS` futures were not created
  immediately. This also caused incorrect load measurements.

- Endpoints are also constrained with `tower-in-flight-limit`. By
  default, the limit is that of the load balancer (so endpoints are
  effectively unlimited).

- The `demo.rs` script has been reorganized to account for the new
  runtime, such that all examples are one task chain.

- New output format:
```
REQUESTS=50000
CONCURRENCY=50
ENDPOINT_CAPACITY=50
MAX_ENDPOINT_LATENCIES=[1ms, 10ms, 10ms, 10ms, 10ms, 100ms, 100ms, 100ms, 100ms, 1000ms, ]
P2C+LeastLoaded
  wall   18s
  p50     5ms
  p90    56ms
  p95    80ms
  p99    98ms
  p999  900ms
RoundRobin
  wall   72s
  p50     9ms
  p90    98ms
  p95   488ms
  p99   898ms
  p999  989ms
```
This commit is contained in:
Oliver Gould 2018-06-04 17:54:07 -07:00 committed by GitHub
parent 58b8078fd6
commit 01fd57c053
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 234 additions and 138 deletions

View File

@ -16,6 +16,7 @@ indexmap = "1"
log = "0.4.1"
env_logger = { version = "0.5.3", default-features = false }
hdrsample = "6.0"
tokio-core = "^0.1.12"
tokio-timer = "0.1"
tokio = "0.1.6"
quickcheck = { version = "0.6", default-features = false }
tower-buffer = { version = "0.1", path = "../tower-buffer" }
tower-in-flight-limit = { version = "0.1", path = "../tower-in-flight-limit" }

View File

@ -7,32 +7,175 @@ extern crate hdrsample;
#[macro_use]
extern crate log;
extern crate rand;
extern crate tokio_core;
extern crate tokio_timer;
extern crate tokio;
extern crate tower_balance;
extern crate tower_buffer;
extern crate tower_discover;
extern crate tower_in_flight_limit;
extern crate tower_service;
use futures::{Async, Future, Stream, Poll, future, stream};
use futures::{future, stream, Async, Future, Poll, Stream};
use hdrsample::Histogram;
use rand::Rng;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio_core::reactor::Core;
use tokio_timer::{Timer, TimerError, Sleep};
use tower_balance::*;
use tokio::{runtime, timer};
use tower_balance as lb;
use tower_buffer::Buffer;
use tower_discover::{Change, Discover};
use tower_in_flight_limit::InFlightLimit;
use tower_service::Service;
struct DelayService(Timer, Duration);
const REQUESTS: usize = 50_000;
const CONCURRENCY: usize = 50;
static ENDPOINT_CAPACITY: usize = CONCURRENCY;
static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
Duration::from_millis(1),
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(10),
Duration::from_millis(100),
Duration::from_millis(100),
Duration::from_millis(100),
Duration::from_millis(100),
Duration::from_millis(1000),
];
struct Delay(Sleep, Instant);
fn main() {
env_logger::init();
struct Disco(VecDeque<Change<usize, DelayService>>);
println!("REQUESTS={}", REQUESTS);
println!("CONCURRENCY={}", CONCURRENCY);
println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
print!("MAX_ENDPOINT_LATENCIES=[");
for max in &MAX_ENDPOINT_LATENCIES {
let l = max.as_secs() * 1_000 + u64::from(max.subsec_nanos() / 1_000 / 1_000);
print!("{}ms, ", l);
}
println!("]");
let mut rt = runtime::Runtime::new().unwrap();
let executor = rt.executor();
let exec = executor.clone();
let fut = future::lazy(move || {
let d = gen_disco(exec.clone());
let ll = lb::power_of_two_choices(lb::load::WithPendingRequests::new(d));
run("P2C+LeastLoaded", ll, &exec)
});
let exec = executor;
let fut = fut.and_then(move |_| {
let rr = lb::round_robin(gen_disco(exec.clone()));
run("RoundRobin", rr, &exec)
});
rt.spawn(fut);
rt.shutdown_on_idle().wait().unwrap();
}
fn gen_disco(executor: runtime::TaskExecutor) -> Disco {
use self::Change::Insert;
let mut changes = VecDeque::new();
for (i, latency) in MAX_ENDPOINT_LATENCIES.iter().enumerate() {
changes.push_back(Insert(i, DelayService(*latency)));
}
Disco { changes, executor }
}
fn run<D, C>(
name: &'static str,
lb: lb::Balance<D, C>,
executor: &runtime::TaskExecutor,
) -> impl Future<Item = (), Error = ()>
where
D: Discover<Request = Req, Response = Rsp> + Send + 'static,
D::Key: Send,
D::Service: Send,
D::Error: Send,
D::DiscoverError: Send,
<D::Service as Service>::Future: Send,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
println!("{}", name);
let t0 = Instant::now();
compute_histo(SendRequests::new(lb, REQUESTS, CONCURRENCY, executor))
.map(move |h| report(&h, t0.elapsed()))
.map_err(|_| {})
}
fn compute_histo<S>(times: S) -> impl Future<Item = Histogram<u64>, Error = S::Error> + 'static
where
S: Stream<Item = Rsp> + 'static,
{
// The max delay is 2000ms. At 3 significant figures.
let histo = Histogram::<u64>::new_with_max(3_000, 3).unwrap();
times.fold(histo, |mut histo, Rsp { latency }| {
let ms = latency.as_secs() * 1_000;
let ms = ms + u64::from(latency.subsec_nanos()) / 1_000 / 1_000;
histo += ms;
future::ok(histo)
})
}
fn report(histo: &Histogram<u64>, elapsed: Duration) {
println!(" wall {:4}s", elapsed.as_secs());
if histo.len() < 2 {
return;
}
println!(" p50 {:4}ms", histo.value_at_quantile(0.5));
if histo.len() < 10 {
return;
}
println!(" p90 {:4}ms", histo.value_at_quantile(0.9));
if histo.len() < 50 {
return;
}
println!(" p95 {:4}ms", histo.value_at_quantile(0.95));
if histo.len() < 100 {
return;
}
println!(" p99 {:4}ms", histo.value_at_quantile(0.99));
if histo.len() < 1000 {
return;
}
println!(" p999 {:4}ms", histo.value_at_quantile(0.999));
}
#[derive(Debug)]
struct DelayService(Duration);
#[derive(Debug)]
struct Delay {
delay: timer::Delay,
start: Instant,
}
struct Disco {
changes: VecDeque<Change<usize, DelayService>>,
executor: runtime::TaskExecutor,
}
#[derive(Debug)]
struct Req;
#[derive(Debug)]
struct Rsp {
latency: Duration,
}
impl Service for DelayService {
type Request = ();
type Response = Duration;
type Error = TimerError;
type Request = Req;
type Response = Rsp;
type Error = timer::Error;
type Future = Delay;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
@ -40,74 +183,102 @@ impl Service for DelayService {
Ok(Async::Ready(()))
}
fn call(&mut self, _: ()) -> Delay {
Delay(self.0.sleep(self.1), Instant::now())
fn call(&mut self, _: Req) -> Delay {
let start = Instant::now();
let maxms = u64::from(self.0.subsec_nanos() / 1_000 / 1_000)
.saturating_add(self.0.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
Delay {
delay: timer::Delay::new(start + latency),
start,
}
}
}
impl Future for Delay {
type Item = Duration;
type Error = TimerError;
type Item = Rsp;
type Error = timer::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
try_ready!(self.0.poll());
Ok(Async::Ready(self.1.elapsed()))
try_ready!(self.delay.poll());
let rsp = Rsp {
latency: Instant::now() - self.start,
};
Ok(Async::Ready(rsp))
}
}
impl Discover for Disco {
type Key = usize;
type Request = ();
type Response = Duration;
type Error = TimerError;
type Service = DelayService;
type Request = Req;
type Response = Rsp;
type Error = tower_in_flight_limit::Error<tower_buffer::Error<timer::Error>>;
type Service = InFlightLimit<Buffer<DelayService>>;
type DiscoverError = ();
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
let r = self.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
match self.changes.pop_front() {
Some(Change::Insert(k, svc)) => {
let svc = Buffer::new(svc, &self.executor).unwrap();
let svc = InFlightLimit::new(svc, ENDPOINT_CAPACITY);
Ok(Async::Ready(Change::Insert(k, svc)))
}
Some(Change::Remove(k)) => Ok(Async::Ready(Change::Remove(k))),
None => Ok(Async::NotReady),
}
}
}
fn gen_disco(timer: &Timer) -> Disco {
use self::Change::Insert;
let mut changes = VecDeque::new();
let quick = Duration::from_millis(500);
for i in 0..8 {
changes.push_back(Insert(i, DelayService(timer.clone(), quick)));
}
let slow = Duration::from_secs(2);
changes.push_back((Insert(9, DelayService(timer.clone(), slow))));
Disco(changes)
}
struct SendRequests<D, C>
where
D: Discover<Request = (), Response = Duration, Error = TimerError>,
C: Choose<D::Key, D::Service>,
D: Discover<Request = Req, Response = Rsp>,
C: lb::Choose<D::Key, D::Service>,
{
lb: Balance<D, C>,
send_remaining: usize,
responses: stream::FuturesUnordered<ResponseFuture<<D::Service as Service>::Future, D::DiscoverError>>,
lb: InFlightLimit<Buffer<lb::Balance<D, C>>>,
responses: stream::FuturesUnordered<
tower_in_flight_limit::ResponseFuture<tower_buffer::ResponseFuture<lb::Balance<D, C>>>,
>,
}
impl<D, C> SendRequests<D, C>
where
D: Discover<Request = Req, Response = Rsp> + Send + 'static,
D::Key: Send,
D::Service: Send,
D::Error: Send,
D::DiscoverError: Send,
<D::Service as Service>::Future: Send,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
pub fn new(
lb: lb::Balance<D, C>,
total: usize,
concurrency: usize,
executor: &runtime::TaskExecutor,
) -> Self {
Self {
send_remaining: total,
lb: InFlightLimit::new(Buffer::new(lb, executor).ok().expect("buffer"), concurrency),
responses: stream::FuturesUnordered::new(),
}
}
}
impl<D, C> Stream for SendRequests<D, C>
where
D: Discover<Request = (), Response = Duration, Error = TimerError>,
C: Choose<D::Key, D::Service>,
D: Discover<Request = Req, Response = Rsp>,
C: lb::Choose<D::Key, D::Service>,
{
type Item = Duration;
type Error = Error<D::Error, D::DiscoverError>;
type Item = Rsp;
type Error =
tower_in_flight_limit::Error<tower_buffer::Error<<lb::Balance<D, C> as Service>::Error>>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
debug!("sending requests {} / {}", self.send_remaining, self.responses.len());
debug!(
"sending requests {} / {}",
self.send_remaining,
self.responses.len()
);
while self.send_remaining > 0 {
if !self.responses.is_empty() {
if let Async::Ready(Some(rsp)) = self.responses.poll()? {
@ -115,16 +286,14 @@ where
}
}
if self.send_remaining > 0 {
debug!("polling lb ready");
try_ready!(self.lb.poll_ready());
debug!("polling lb ready");
try_ready!(self.lb.poll_ready());
debug!("sending request");
let rsp = self.lb.call(());
self.responses.push(rsp);
debug!("sending request");
let rsp = self.lb.call(Req);
self.responses.push(rsp);
self.send_remaining -= 1;
}
self.send_remaining -= 1;
}
if !self.responses.is_empty() {
@ -134,77 +303,3 @@ where
Ok(Async::Ready(None))
}
}
fn compute_histo<S>(times: S)
-> Box<Future<Item = Histogram<u64>, Error = S::Error> + 'static>
where
S: Stream<Item = Duration> + 'static
{
// The max delay is 2000ms. At 3 significant figures.
let histo = Histogram::<u64>::new_with_max(3_000, 3).unwrap();
let fut = times
.fold(histo, |mut histo, elapsed| {
let ns: u32 = elapsed.subsec_nanos();
let ms = u64::from(ns) / 1_000 / 1_000
+ elapsed.as_secs() * 1_000;
histo += ms;
future::ok(histo)
});
Box::new(fut)
}
fn report(pfx: &str, histo: &Histogram<u64>) {
println!("{} samples: {}", pfx, histo.len());
if histo.len () < 2 {
return;
}
println!("{} p50: {}", pfx, histo.value_at_quantile(0.5));
if histo.len () < 10 {
return;
}
println!("{} p90: {}", pfx, histo.value_at_quantile(0.9));
if histo.len () < 50 {
return;
}
println!("{} p95: {}", pfx, histo.value_at_quantile(0.95));
if histo.len () < 100 {
return;
}
println!("{} p99: {}", pfx, histo.value_at_quantile(0.99));
if histo.len () < 1000 {
return;
}
println!("{} p999: {}", pfx, histo.value_at_quantile(0.999));
}
fn main() {
env_logger::init();
let timer = Timer::default();
let mut core = Core::new().unwrap();
let requests = 1_000_000;
{
let lb = {
let loaded = load::WithPendingRequests::new(gen_disco(&timer));
power_of_two_choices(loaded)
};
let send = SendRequests { lb, send_remaining: requests, responses: stream::FuturesUnordered::new() };
let histo = core.run(compute_histo(send)).unwrap();
report("p2c", &histo)
}
{
let lb = round_robin(gen_disco(&timer));
let send = SendRequests { lb, send_remaining: requests, responses: stream::FuturesUnordered::new() };
let histo = core.run(compute_histo(send)).unwrap();
report("rr", &histo)
}
}

View File

@ -13,8 +13,8 @@ extern crate tower_service;
use futures::{Async, Future, Poll};
use indexmap::IndexMap;
use std::{fmt, error};
use std::marker::PhantomData;
use std::{error, fmt};
use tower_discover::Discover;
use tower_service::Service;