From 115125b1663ab40368804431b70c1bcb8ce70422 Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Wed, 24 Jun 2020 12:04:10 -0700 Subject: [PATCH] add back broken tower-balance example --- tower/examples/tower-balance.rs | 226 ++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 tower/examples/tower-balance.rs diff --git a/tower/examples/tower-balance.rs b/tower/examples/tower-balance.rs new file mode 100644 index 0000000..649cbeb --- /dev/null +++ b/tower/examples/tower-balance.rs @@ -0,0 +1,226 @@ +//! Exercises load balancers with mocked services. + +use futures_core::{Stream, TryStream}; +use futures_util::{stream, stream::StreamExt, stream::TryStreamExt}; +use hdrhistogram::Histogram; +use pin_project::pin_project; +use rand::{self, Rng}; +use std::hash::Hash; +use std::time::Duration; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::time::{self, Instant}; +use tower::balance as lb; +use tower::discover::{Change, Discover}; +use tower::limit::concurrency::ConcurrencyLimit; +use tower::load; +use tower::util::ServiceExt; +use tower_service::Service; + +const REQUESTS: usize = 100_000; +const CONCURRENCY: usize = 500; +const DEFAULT_RTT: Duration = Duration::from_millis(30); +static ENDPOINT_CAPACITY: usize = CONCURRENCY; +static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [ + Duration::from_millis(1), + Duration::from_millis(5), + Duration::from_millis(10), + Duration::from_millis(10), + Duration::from_millis(10), + Duration::from_millis(100), + Duration::from_millis(100), + Duration::from_millis(100), + Duration::from_millis(500), + Duration::from_millis(1000), +]; + +struct Summary { + latencies: Histogram, + start: Instant, + count_by_instance: [usize; 10], +} + +#[tokio::main] +async fn main() { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap(); + + println!("REQUESTS={}", REQUESTS); + println!("CONCURRENCY={}", CONCURRENCY); + println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY); + print!("MAX_ENDPOINT_LATENCIES=["); + for max in &MAX_ENDPOINT_LATENCIES { + let l = max.as_secs() * 1_000 + u64::from(max.subsec_nanos() / 1_000 / 1_000); + print!("{}ms, ", l); + } + println!("]"); + + let decay = Duration::from_secs(10); + let d = gen_disco(); + let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new( + d, + DEFAULT_RTT, + decay, + load::CompleteOnResponse::default(), + )); + run("P2C+PeakEWMA...", pe).await; + + let d = gen_disco(); + let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new( + d, + load::CompleteOnResponse::default(), + )); + run("P2C+LeastLoaded...", ll).await; +} + +type Error = Box; + +type Key = usize; + +#[pin_project] +struct Disco(Vec<(Key, S)>); + +impl Stream for Disco +where + S: Service, +{ + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + match self.project().0.pop() { + Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))), + None => { + // there may be more later + Poll::Pending + } + } + } +} + +fn gen_disco() -> impl Discover< + Key = Key, + Error = Error, + Service = ConcurrencyLimit< + impl Service + Send, + >, +> + Send { + Disco( + MAX_ENDPOINT_LATENCIES + .iter() + .enumerate() + .map(|(instance, latency)| { + let svc = tower::service_fn(move |_| { + let start = Instant::now(); + + let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000) + .saturating_add(latency.as_secs().saturating_mul(1_000)); + let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms)); + + async move { + time::delay_until(start + latency).await; + let latency = start.elapsed(); + Ok(Rsp { latency, instance }) + } + }); + + (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY)) + }) + .collect(), + ) +} + +async fn run(name: &'static str, lb: lb::p2c::Balance) +where + D: Discover + Unpin + Send + 'static, + D::Error: Into, + D::Key: Clone + Send + Hash, + D::Service: Service + load::Load + Send, + >::Error: Into, + >::Future: Send, + ::Metric: std::fmt::Debug, +{ + println!("{}", name); + + let requests = stream::repeat(Req).take(REQUESTS); + let service = ConcurrencyLimit::new(lb, CONCURRENCY); + let responses = service.call_all(requests).unordered(); + + compute_histo(responses).await.unwrap().report(); +} + +async fn compute_histo(mut times: S) -> Result +where + S: TryStream + 'static + Unpin, +{ + let mut summary = Summary::new(); + while let Some(rsp) = times.try_next().await? { + summary.count(rsp); + } + Ok(summary) +} + +impl Summary { + fn new() -> Self { + Self { + // The max delay is 2000ms. At 3 significant figures. + latencies: Histogram::::new_with_max(3_000, 3).unwrap(), + start: Instant::now(), + count_by_instance: [0; 10], + } + } + + fn count(&mut self, rsp: Rsp) { + let ms = rsp.latency.as_secs() * 1_000; + let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000; + self.latencies += ms; + self.count_by_instance[rsp.instance] += 1; + } + + fn report(&self) { + let mut total = 0; + for c in &self.count_by_instance { + total += c; + } + for (i, c) in self.count_by_instance.iter().enumerate() { + let p = *c as f64 / total as f64 * 100.0; + println!(" [{:02}] {:>5.01}%", i, p); + } + + println!(" wall {:4}s", self.start.elapsed().as_secs()); + + if self.latencies.len() < 2 { + return; + } + println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5)); + + if self.latencies.len() < 10 { + return; + } + println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9)); + + if self.latencies.len() < 50 { + return; + } + println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95)); + + if self.latencies.len() < 100 { + return; + } + println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99)); + + if self.latencies.len() < 1000 { + return; + } + println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999)); + } +} + +#[derive(Debug, Clone)] +struct Req; + +#[derive(Debug)] +struct Rsp { + latency: Duration, + instance: usize, +}