add back broken tower-balance example
This commit is contained in:
parent
2c3a500c10
commit
115125b166
|
@ -0,0 +1,226 @@
|
|||
//! Exercises load balancers with mocked services.
|
||||
|
||||
use futures_core::{Stream, TryStream};
|
||||
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
|
||||
use hdrhistogram::Histogram;
|
||||
use pin_project::pin_project;
|
||||
use rand::{self, Rng};
|
||||
use std::hash::Hash;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::time::{self, Instant};
|
||||
use tower::balance as lb;
|
||||
use tower::discover::{Change, Discover};
|
||||
use tower::limit::concurrency::ConcurrencyLimit;
|
||||
use tower::load;
|
||||
use tower::util::ServiceExt;
|
||||
use tower_service::Service;
|
||||
|
||||
const REQUESTS: usize = 100_000;
|
||||
const CONCURRENCY: usize = 500;
|
||||
const DEFAULT_RTT: Duration = Duration::from_millis(30);
|
||||
static ENDPOINT_CAPACITY: usize = CONCURRENCY;
|
||||
static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
|
||||
Duration::from_millis(1),
|
||||
Duration::from_millis(5),
|
||||
Duration::from_millis(10),
|
||||
Duration::from_millis(10),
|
||||
Duration::from_millis(10),
|
||||
Duration::from_millis(100),
|
||||
Duration::from_millis(100),
|
||||
Duration::from_millis(100),
|
||||
Duration::from_millis(500),
|
||||
Duration::from_millis(1000),
|
||||
];
|
||||
|
||||
struct Summary {
|
||||
latencies: Histogram<u64>,
|
||||
start: Instant,
|
||||
count_by_instance: [usize; 10],
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
|
||||
|
||||
println!("REQUESTS={}", REQUESTS);
|
||||
println!("CONCURRENCY={}", CONCURRENCY);
|
||||
println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
|
||||
print!("MAX_ENDPOINT_LATENCIES=[");
|
||||
for max in &MAX_ENDPOINT_LATENCIES {
|
||||
let l = max.as_secs() * 1_000 + u64::from(max.subsec_nanos() / 1_000 / 1_000);
|
||||
print!("{}ms, ", l);
|
||||
}
|
||||
println!("]");
|
||||
|
||||
let decay = Duration::from_secs(10);
|
||||
let d = gen_disco();
|
||||
let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
|
||||
d,
|
||||
DEFAULT_RTT,
|
||||
decay,
|
||||
load::CompleteOnResponse::default(),
|
||||
));
|
||||
run("P2C+PeakEWMA...", pe).await;
|
||||
|
||||
let d = gen_disco();
|
||||
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
|
||||
d,
|
||||
load::CompleteOnResponse::default(),
|
||||
));
|
||||
run("P2C+LeastLoaded...", ll).await;
|
||||
}
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
type Key = usize;
|
||||
|
||||
#[pin_project]
|
||||
struct Disco<S>(Vec<(Key, S)>);
|
||||
|
||||
impl<S> Stream for Disco<S>
|
||||
where
|
||||
S: Service<Req, Response = Rsp, Error = Error>,
|
||||
{
|
||||
type Item = Result<Change<Key, S>, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match self.project().0.pop() {
|
||||
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
|
||||
None => {
|
||||
// there may be more later
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn gen_disco() -> impl Discover<
|
||||
Key = Key,
|
||||
Error = Error,
|
||||
Service = ConcurrencyLimit<
|
||||
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
|
||||
>,
|
||||
> + Send {
|
||||
Disco(
|
||||
MAX_ENDPOINT_LATENCIES
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(instance, latency)| {
|
||||
let svc = tower::service_fn(move |_| {
|
||||
let start = Instant::now();
|
||||
|
||||
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
|
||||
.saturating_add(latency.as_secs().saturating_mul(1_000));
|
||||
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
|
||||
|
||||
async move {
|
||||
time::delay_until(start + latency).await;
|
||||
let latency = start.elapsed();
|
||||
Ok(Rsp { latency, instance })
|
||||
}
|
||||
});
|
||||
|
||||
(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
|
||||
where
|
||||
D: Discover + Unpin + Send + 'static,
|
||||
D::Error: Into<Error>,
|
||||
D::Key: Clone + Send + Hash,
|
||||
D::Service: Service<Req, Response = Rsp> + load::Load + Send,
|
||||
<D::Service as Service<Req>>::Error: Into<Error>,
|
||||
<D::Service as Service<Req>>::Future: Send,
|
||||
<D::Service as load::Load>::Metric: std::fmt::Debug,
|
||||
{
|
||||
println!("{}", name);
|
||||
|
||||
let requests = stream::repeat(Req).take(REQUESTS);
|
||||
let service = ConcurrencyLimit::new(lb, CONCURRENCY);
|
||||
let responses = service.call_all(requests).unordered();
|
||||
|
||||
compute_histo(responses).await.unwrap().report();
|
||||
}
|
||||
|
||||
async fn compute_histo<S>(mut times: S) -> Result<Summary, Error>
|
||||
where
|
||||
S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,
|
||||
{
|
||||
let mut summary = Summary::new();
|
||||
while let Some(rsp) = times.try_next().await? {
|
||||
summary.count(rsp);
|
||||
}
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
// The max delay is 2000ms. At 3 significant figures.
|
||||
latencies: Histogram::<u64>::new_with_max(3_000, 3).unwrap(),
|
||||
start: Instant::now(),
|
||||
count_by_instance: [0; 10],
|
||||
}
|
||||
}
|
||||
|
||||
fn count(&mut self, rsp: Rsp) {
|
||||
let ms = rsp.latency.as_secs() * 1_000;
|
||||
let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000;
|
||||
self.latencies += ms;
|
||||
self.count_by_instance[rsp.instance] += 1;
|
||||
}
|
||||
|
||||
fn report(&self) {
|
||||
let mut total = 0;
|
||||
for c in &self.count_by_instance {
|
||||
total += c;
|
||||
}
|
||||
for (i, c) in self.count_by_instance.iter().enumerate() {
|
||||
let p = *c as f64 / total as f64 * 100.0;
|
||||
println!(" [{:02}] {:>5.01}%", i, p);
|
||||
}
|
||||
|
||||
println!(" wall {:4}s", self.start.elapsed().as_secs());
|
||||
|
||||
if self.latencies.len() < 2 {
|
||||
return;
|
||||
}
|
||||
println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5));
|
||||
|
||||
if self.latencies.len() < 10 {
|
||||
return;
|
||||
}
|
||||
println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9));
|
||||
|
||||
if self.latencies.len() < 50 {
|
||||
return;
|
||||
}
|
||||
println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95));
|
||||
|
||||
if self.latencies.len() < 100 {
|
||||
return;
|
||||
}
|
||||
println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99));
|
||||
|
||||
if self.latencies.len() < 1000 {
|
||||
return;
|
||||
}
|
||||
println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Req;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Rsp {
|
||||
latency: Duration,
|
||||
instance: usize,
|
||||
}
|
Loading…
Reference in New Issue