balance: cleanup example (#211)

This commit is contained in:
Carl Lerche 2019-03-29 14:24:43 -07:00 committed by GitHub
parent da4e22c89d
commit 2448ca9cdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 205 deletions

View File

@ -21,5 +21,6 @@ hdrsample = "6.0"
quickcheck = { version = "0.6", default-features = false }
tokio = "0.1.7"
tokio-executor = "0.1.2"
tower = { version = "0.1", path = "../tower" }
tower-buffer = { version = "0.1", path = "../tower-buffer" }
tower-in-flight-limit = { version = "0.1", path = "../tower-in-flight-limit" }

View File

@ -1,31 +1,30 @@
//! Exercises load balancers with mocked services.
extern crate env_logger;
#[macro_use]
extern crate futures;
extern crate hdrsample;
#[macro_use]
extern crate log;
extern crate rand;
extern crate tokio;
extern crate tower;
extern crate tower_balance;
extern crate tower_buffer;
extern crate tower_discover;
extern crate tower_in_flight_limit;
extern crate tower_service;
extern crate tower_service_util;
use futures::{future, stream, Async, Future, Poll, Stream};
use futures::{future, stream, Future, Stream};
use hdrsample::Histogram;
use rand::Rng;
use std::collections::VecDeque;
use std::fmt;
use std::time::{Duration, Instant};
use tokio::{runtime, timer};
use tower::ServiceExt;
use tower_balance as lb;
use tower_buffer::Buffer;
use tower_discover::{Change, Discover};
use tower_discover::Discover;
use tower_in_flight_limit::InFlightLimit;
use tower_service::Service;
use tower_service_util::ServiceFn;
const REQUESTS: usize = 50_000;
const CONCURRENCY: usize = 50;
@ -89,46 +88,64 @@ fn main() {
rt.shutdown_on_idle().wait().unwrap();
}
fn gen_disco() -> Disco {
use self::Change::Insert;
type Error = Box<::std::error::Error + Send + Sync>;
let mut changes = VecDeque::new();
for (i, latency) in MAX_ENDPOINT_LATENCIES.iter().enumerate() {
changes.push_back(Insert(i, DelayService(*latency)));
}
fn gen_disco() -> impl Discover<
Key = usize,
Error = impl Into<Error>,
Service = impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
> + Send {
tower_discover::ServiceList::new(MAX_ENDPOINT_LATENCIES.iter().map(|latency| {
let svc = ServiceFn::new(move |_| {
let start = Instant::now();
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
.saturating_add(latency.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
let delay = timer::Delay::new(start + latency);
Disco { changes }
delay.map(move |_| {
let latency = Instant::now() - start;
Rsp { latency }
})
});
InFlightLimit::new(svc, ENDPOINT_CAPACITY)
}))
}
fn run<D, C>(name: &'static str, lb: lb::Balance<D, C>) -> impl Future<Item = (), Error = ()>
where
D: Discover + Send + 'static,
D::Error: Into<Error>,
D::Key: Send,
D::Service: Service<Req, Response = Rsp> + Send,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req, Response = Rsp, Error = Error> + Send,
<D::Service as Service<Req>>::Future: Send,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
println!("{}", name);
let t0 = Instant::now();
compute_histo(SendRequests::new(lb, REQUESTS, CONCURRENCY))
let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64);
let service = InFlightLimit::new(lb, CONCURRENCY);
let responses = service.call_all(requests).unordered();
compute_histo(responses)
.map(move |h| report(&h, t0.elapsed()))
.map_err(|_| {})
}
fn compute_histo<S>(times: S) -> impl Future<Item = Histogram<u64>, Error = S::Error> + 'static
fn compute_histo<S>(times: S) -> impl Future<Item = Histogram<u64>, Error = Error> + 'static
where
S: Stream<Item = Rsp> + 'static,
S: Stream<Item = Rsp, Error = Error> + 'static,
{
// The max delay is 2000ms. At 3 significant figures.
let histo = Histogram::<u64>::new_with_max(3_000, 3).unwrap();
times.fold(histo, |mut histo, Rsp { latency }| {
let ms = latency.as_secs() * 1_000;
let ms = ms + u64::from(latency.subsec_nanos()) / 1_000 / 1_000;
histo += ms;
future::ok(histo)
future::ok::<_, Error>(histo)
})
}
@ -161,193 +178,10 @@ fn report(histo: &Histogram<u64>, elapsed: Duration) {
println!(" p999 {:4}ms", histo.value_at_quantile(0.999));
}
#[derive(Debug)]
struct DelayService(Duration);
#[derive(Debug)]
struct Delay {
delay: timer::Delay,
start: Instant,
}
struct Disco {
changes: VecDeque<Change<usize, DelayService>>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct Req;
#[derive(Debug)]
struct Rsp {
latency: Duration,
}
impl Service<Req> for DelayService {
type Response = Rsp;
type Error = timer::Error;
type Future = Delay;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
debug!("polling delay service: ready");
Ok(Async::Ready(()))
}
fn call(&mut self, _: Req) -> Delay {
let start = Instant::now();
let maxms = u64::from(self.0.subsec_nanos() / 1_000 / 1_000)
.saturating_add(self.0.as_secs().saturating_mul(1_000));
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
Delay {
delay: timer::Delay::new(start + latency),
start,
}
}
}
impl Future for Delay {
type Item = Rsp;
type Error = timer::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
try_ready!(self.delay.poll());
let rsp = Rsp {
latency: Instant::now() - self.start,
};
Ok(Async::Ready(rsp))
}
}
impl Discover for Disco {
type Key = usize;
type Error = DiscoError;
type Service = Errify<InFlightLimit<Buffer<DelayService, Req>>>;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.changes.pop_front() {
Some(Change::Insert(k, svc)) => {
let svc = Buffer::new(svc, 0).unwrap();
let svc = Errify(InFlightLimit::new(svc, ENDPOINT_CAPACITY));
Ok(Async::Ready(Change::Insert(k, svc)))
}
Some(Change::Remove(k)) => Ok(Async::Ready(Change::Remove(k))),
None => Ok(Async::NotReady),
}
}
}
#[derive(Debug)]
pub struct DiscoError;
impl fmt::Display for DiscoError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "discovery error")
}
}
impl std::error::Error for DiscoError {}
type DemoService<D, C> = InFlightLimit<Buffer<lb::Balance<D, C>, Req>>;
struct SendRequests<D, C>
where
D: Discover,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
C: lb::Choose<D::Key, D::Service>,
{
send_remaining: usize,
lb: DemoService<D, C>,
responses: stream::FuturesUnordered<<DemoService<D, C> as Service<Req>>::Future>,
}
impl<D, C> SendRequests<D, C>
where
D: Discover + Send + 'static,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
D::Key: Send,
D::Service: Send,
D::Error: Send + Sync,
<D::Service as Service<Req>>::Future: Send,
<D::Service as Service<Req>>::Error: Send + Sync,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
pub fn new(lb: lb::Balance<D, C>, total: usize, concurrency: usize) -> Self {
Self {
send_remaining: total,
lb: InFlightLimit::new(Buffer::new(lb, 0).ok().expect("buffer"), concurrency),
responses: stream::FuturesUnordered::new(),
}
}
}
impl<D, C> Stream for SendRequests<D, C>
where
D: Discover,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
C: lb::Choose<D::Key, D::Service>,
{
type Item = <DemoService<D, C> as Service<Req>>::Response;
type Error = <DemoService<D, C> as Service<Req>>::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
debug!(
"sending requests {} / {}",
self.send_remaining,
self.responses.len()
);
while self.send_remaining > 0 {
if !self.responses.is_empty() {
if let Async::Ready(Some(rsp)) = self.responses.poll()? {
return Ok(Async::Ready(Some(rsp)));
}
}
debug!("polling lb ready");
try_ready!(self.lb.poll_ready());
debug!("sending request");
let rsp = self.lb.call(Req);
self.responses.push(rsp);
self.send_remaining -= 1;
}
if !self.responses.is_empty() {
return self.responses.poll();
}
Ok(Async::Ready(None))
}
}
pub struct Errify<T>(T);
impl<T, Request> Service<Request> for Errify<T>
where
T: Service<Request>,
{
type Response = T::Response;
type Error = DiscoError;
type Future = Errify<T::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready().map_err(|_| DiscoError)
}
fn call(&mut self, request: Request) -> Self::Future {
Errify(self.0.call(request))
}
}
impl<T: Future> Future for Errify<T> {
type Item = T::Item;
type Error = DiscoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(|_| DiscoError)
}
}