From 2448ca9cdc7c2ad59925e440056e2beb6c30bcf5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 29 Mar 2019 14:24:43 -0700 Subject: [PATCH] balance: cleanup example (#211) --- tower-balance/Cargo.toml | 1 + tower-balance/examples/demo.rs | 244 ++++++--------------------------- 2 files changed, 40 insertions(+), 205 deletions(-) diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index 93d8bd5..8895ae7 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -21,5 +21,6 @@ hdrsample = "6.0" quickcheck = { version = "0.6", default-features = false } tokio = "0.1.7" tokio-executor = "0.1.2" +tower = { version = "0.1", path = "../tower" } tower-buffer = { version = "0.1", path = "../tower-buffer" } tower-in-flight-limit = { version = "0.1", path = "../tower-in-flight-limit" } diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index 8b2f7f2..0e0b06d 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -1,31 +1,30 @@ //! Exercises load balancers with mocked services. extern crate env_logger; -#[macro_use] extern crate futures; extern crate hdrsample; -#[macro_use] extern crate log; extern crate rand; extern crate tokio; +extern crate tower; extern crate tower_balance; extern crate tower_buffer; extern crate tower_discover; extern crate tower_in_flight_limit; extern crate tower_service; +extern crate tower_service_util; -use futures::{future, stream, Async, Future, Poll, Stream}; +use futures::{future, stream, Future, Stream}; use hdrsample::Histogram; use rand::Rng; -use std::collections::VecDeque; -use std::fmt; use std::time::{Duration, Instant}; use tokio::{runtime, timer}; +use tower::ServiceExt; use tower_balance as lb; -use tower_buffer::Buffer; -use tower_discover::{Change, Discover}; +use tower_discover::Discover; use tower_in_flight_limit::InFlightLimit; use tower_service::Service; +use tower_service_util::ServiceFn; const REQUESTS: usize = 50_000; const CONCURRENCY: usize = 50; @@ -89,46 +88,64 @@ fn main() { rt.shutdown_on_idle().wait().unwrap(); } -fn gen_disco() -> Disco { - use self::Change::Insert; +type Error = Box<::std::error::Error + Send + Sync>; - let mut changes = VecDeque::new(); - for (i, latency) in MAX_ENDPOINT_LATENCIES.iter().enumerate() { - changes.push_back(Insert(i, DelayService(*latency))); - } +fn gen_disco() -> impl Discover< + Key = usize, + Error = impl Into, + Service = impl Service + Send, +> + Send { + tower_discover::ServiceList::new(MAX_ENDPOINT_LATENCIES.iter().map(|latency| { + let svc = ServiceFn::new(move |_| { + let start = Instant::now(); + let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000) + .saturating_add(latency.as_secs().saturating_mul(1_000)); + let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms)); + let delay = timer::Delay::new(start + latency); - Disco { changes } + delay.map(move |_| { + let latency = Instant::now() - start; + Rsp { latency } + }) + }); + + InFlightLimit::new(svc, ENDPOINT_CAPACITY) + })) } fn run(name: &'static str, lb: lb::Balance) -> impl Future where D: Discover + Send + 'static, + D::Error: Into, D::Key: Send, - D::Service: Service + Send, - D::Error: ::std::error::Error + Send + Sync + 'static, + D::Service: Service + Send, >::Future: Send, - >::Error: ::std::error::Error + Send + Sync + 'static, C: lb::Choose + Send + 'static, { println!("{}", name); let t0 = Instant::now(); - compute_histo(SendRequests::new(lb, REQUESTS, CONCURRENCY)) + let requests = stream::repeat::<_, Error>(Req).take(REQUESTS as u64); + let service = InFlightLimit::new(lb, CONCURRENCY); + let responses = service.call_all(requests).unordered(); + + compute_histo(responses) .map(move |h| report(&h, t0.elapsed())) .map_err(|_| {}) } -fn compute_histo(times: S) -> impl Future, Error = S::Error> + 'static +fn compute_histo(times: S) -> impl Future, Error = Error> + 'static where - S: Stream + 'static, + S: Stream + 'static, { // The max delay is 2000ms. At 3 significant figures. let histo = Histogram::::new_with_max(3_000, 3).unwrap(); + times.fold(histo, |mut histo, Rsp { latency }| { let ms = latency.as_secs() * 1_000; let ms = ms + u64::from(latency.subsec_nanos()) / 1_000 / 1_000; histo += ms; - future::ok(histo) + future::ok::<_, Error>(histo) }) } @@ -161,193 +178,10 @@ fn report(histo: &Histogram, elapsed: Duration) { println!(" p999 {:4}ms", histo.value_at_quantile(0.999)); } -#[derive(Debug)] -struct DelayService(Duration); - -#[derive(Debug)] -struct Delay { - delay: timer::Delay, - start: Instant, -} - -struct Disco { - changes: VecDeque>, -} - -#[derive(Debug)] +#[derive(Debug, Clone)] struct Req; #[derive(Debug)] struct Rsp { latency: Duration, } - -impl Service for DelayService { - type Response = Rsp; - type Error = timer::Error; - type Future = Delay; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - debug!("polling delay service: ready"); - Ok(Async::Ready(())) - } - - fn call(&mut self, _: Req) -> Delay { - let start = Instant::now(); - let maxms = u64::from(self.0.subsec_nanos() / 1_000 / 1_000) - .saturating_add(self.0.as_secs().saturating_mul(1_000)); - let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms)); - Delay { - delay: timer::Delay::new(start + latency), - start, - } - } -} - -impl Future for Delay { - type Item = Rsp; - type Error = timer::Error; - fn poll(&mut self) -> Poll { - try_ready!(self.delay.poll()); - let rsp = Rsp { - latency: Instant::now() - self.start, - }; - Ok(Async::Ready(rsp)) - } -} - -impl Discover for Disco { - type Key = usize; - type Error = DiscoError; - type Service = Errify>>; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.changes.pop_front() { - Some(Change::Insert(k, svc)) => { - let svc = Buffer::new(svc, 0).unwrap(); - let svc = Errify(InFlightLimit::new(svc, ENDPOINT_CAPACITY)); - Ok(Async::Ready(Change::Insert(k, svc))) - } - Some(Change::Remove(k)) => Ok(Async::Ready(Change::Remove(k))), - None => Ok(Async::NotReady), - } - } -} - -#[derive(Debug)] -pub struct DiscoError; - -impl fmt::Display for DiscoError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "discovery error") - } -} - -impl std::error::Error for DiscoError {} - -type DemoService = InFlightLimit, Req>>; - -struct SendRequests -where - D: Discover, - D::Error: ::std::error::Error + Send + Sync + 'static, - D::Service: Service, - >::Error: ::std::error::Error + Send + Sync + 'static, - C: lb::Choose, -{ - send_remaining: usize, - lb: DemoService, - responses: stream::FuturesUnordered< as Service>::Future>, -} - -impl SendRequests -where - D: Discover + Send + 'static, - D::Error: ::std::error::Error + Send + Sync + 'static, - D::Service: Service, - >::Error: ::std::error::Error + Send + Sync + 'static, - D::Key: Send, - D::Service: Send, - D::Error: Send + Sync, - >::Future: Send, - >::Error: Send + Sync, - C: lb::Choose + Send + 'static, -{ - pub fn new(lb: lb::Balance, total: usize, concurrency: usize) -> Self { - Self { - send_remaining: total, - lb: InFlightLimit::new(Buffer::new(lb, 0).ok().expect("buffer"), concurrency), - responses: stream::FuturesUnordered::new(), - } - } -} - -impl Stream for SendRequests -where - D: Discover, - D::Error: ::std::error::Error + Send + Sync + 'static, - D::Service: Service, - >::Error: ::std::error::Error + Send + Sync + 'static, - C: lb::Choose, -{ - type Item = as Service>::Response; - type Error = as Service>::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - debug!( - "sending requests {} / {}", - self.send_remaining, - self.responses.len() - ); - while self.send_remaining > 0 { - if !self.responses.is_empty() { - if let Async::Ready(Some(rsp)) = self.responses.poll()? { - return Ok(Async::Ready(Some(rsp))); - } - } - - debug!("polling lb ready"); - try_ready!(self.lb.poll_ready()); - - debug!("sending request"); - let rsp = self.lb.call(Req); - self.responses.push(rsp); - - self.send_remaining -= 1; - } - - if !self.responses.is_empty() { - return self.responses.poll(); - } - - Ok(Async::Ready(None)) - } -} - -pub struct Errify(T); - -impl Service for Errify -where - T: Service, -{ - type Response = T::Response; - type Error = DiscoError; - type Future = Errify; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready().map_err(|_| DiscoError) - } - - fn call(&mut self, request: Request) -> Self::Future { - Errify(self.0.call(request)) - } -} - -impl Future for Errify { - type Item = T::Item; - type Error = DiscoError; - - fn poll(&mut self) -> Poll { - self.0.poll().map_err(|_| DiscoError) - } -}