diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index 4883ec1..edab573 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -27,6 +27,7 @@ futures = "0.1.26" indexmap = "1.0.2" log = "0.4.1" rand = "0.6.5" +tokio-sync = "0.1.3" tokio-timer = "0.2.4" tower-discover = "0.1.0" tower-layer = "0.1.0" @@ -41,7 +42,7 @@ hdrhistogram = "6.0" quickcheck = { version = "0.6", default-features = false } tokio = "0.1.7" tokio-executor = "0.1.2" -tower = { version = "0.1", path = "../tower" } -tower-buffer = { version = "0.1", path = "../tower-buffer" } -tower-limit = { version = "0.1", path = "../tower-limit" } -tower-test = { version = "0.1.0", path = "../tower-test" } +tower = { version = "*", path = "../tower" } +tower-buffer = { version = "*", path = "../tower-buffer" } +tower-limit = { version = "*", path = "../tower-limit" } +tower-test = { version = "*", path = "../tower-test" } diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index a85c924..fec98b8 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -131,12 +131,13 @@ fn gen_disco() -> impl Discover< ) } -fn run(name: &'static str, lb: lb::p2c::Balance) -> impl Future +fn run(name: &'static str, lb: lb::p2c::Balance) -> impl Future where D: Discover + Send + 'static, D::Error: Into, - D::Key: Send, - D::Service: Service + load::Load + Send, + D::Key: Clone + Send, + D::Service: Service + load::Load + Send, + >::Error: Into, >::Future: Send, ::Metric: std::fmt::Debug, { diff --git a/tower-balance/src/p2c/layer.rs b/tower-balance/src/p2c/layer.rs index ebd43d6..7a92c62 100644 --- a/tower-balance/src/p2c/layer.rs +++ b/tower-balance/src/p2c/layer.rs @@ -5,12 +5,12 @@ use tower_layer::Layer; /// Efficiently distributes requests across an arbitrary number of services #[derive(Clone)] -pub struct BalanceLayer { +pub struct BalanceLayer { rng: SmallRng, - _marker: PhantomData, + _marker: PhantomData, } -impl BalanceLayer { +impl BalanceLayer { /// Builds a balancer using the system entropy. pub fn new() -> Self { Self { @@ -31,15 +31,15 @@ impl BalanceLayer { } } -impl Layer for BalanceLayer { - type Service = BalanceMake; +impl Layer for BalanceLayer { + type Service = BalanceMake; fn layer(&self, make_discover: S) -> Self::Service { BalanceMake::new(make_discover, self.rng.clone()) } } -impl fmt::Debug for BalanceLayer { +impl fmt::Debug for BalanceLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BalanceLayer") .field("rng", &self.rng) diff --git a/tower-balance/src/p2c/make.rs b/tower-balance/src/p2c/make.rs index cd6addd..dd3c5be 100644 --- a/tower-balance/src/p2c/make.rs +++ b/tower-balance/src/p2c/make.rs @@ -1,25 +1,32 @@ use super::Balance; use futures::{try_ready, Future, Poll}; use rand::{rngs::SmallRng, FromEntropy}; +use std::marker::PhantomData; use tower_discover::Discover; use tower_service::Service; /// Makes `Balancer`s given an inner service that makes `Discover`s. #[derive(Clone, Debug)] -pub struct BalanceMake { +pub struct BalanceMake { inner: S, rng: SmallRng, + _marker: PhantomData, } /// Makes a balancer instance. -pub struct MakeFuture { +pub struct MakeFuture { inner: F, rng: SmallRng, + _marker: PhantomData, } -impl BalanceMake { +impl BalanceMake { pub(crate) fn new(inner: S, rng: SmallRng) -> Self { - Self { inner, rng } + Self { + inner, + rng, + _marker: PhantomData, + } } /// Initializes a P2C load balancer from the OS's entropy source. @@ -28,14 +35,15 @@ impl BalanceMake { } } -impl Service for BalanceMake +impl Service for BalanceMake where S: Service, S::Response: Discover, + ::Service: Service, { - type Response = Balance; + type Response = Balance; type Error = S::Error; - type Future = MakeFuture; + type Future = MakeFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() @@ -45,16 +53,18 @@ where MakeFuture { inner: self.inner.call(target), rng: self.rng.clone(), + _marker: PhantomData, } } } -impl Future for MakeFuture +impl Future for MakeFuture where F: Future, F::Item: Discover, + ::Service: Service, { - type Item = Balance; + type Item = Balance; type Error = F::Error; fn poll(&mut self) -> Poll { diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index c0bd709..d99f66c 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -1,11 +1,13 @@ use crate::error; -use futures::{future, try_ready, Async, Future, Poll}; +use futures::{future, stream, try_ready, Async, Future, Poll, Stream}; use indexmap::IndexMap; -use log::{debug, info, trace}; +use log::{debug, trace}; use rand::{rngs::SmallRng, FromEntropy}; +use tokio_sync::oneshot; use tower_discover::{Change, Discover}; use tower_load::Load; use tower_service::Service; +use tower_util::Ready; /// Distributes requests across inner services using the [Power of Two Choices][p2c]. /// @@ -21,26 +23,50 @@ use tower_service::Service; /// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded /// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf #[derive(Debug)] -pub struct Balance { +pub struct Balance { discover: D, - endpoints: IndexMap, + ready_services: IndexMap, + + unready_services: stream::FuturesUnordered>, + cancelations: IndexMap>, /// Holds an index into `endpoints`, indicating the service that has been /// chosen to dispatch the next request. - ready_index: Option, + next_ready_index: Option, rng: SmallRng, } -impl Balance { +/// A Future that becomes satisfied when an `S`-typed service is ready. +/// +/// May fail due to cancelation, i.e. if the service is removed from discovery. +#[derive(Debug)] +struct UnreadyService { + key: Option, + cancel: oneshot::Receiver<()>, + ready: tower_util::Ready, +} + +enum Error { + Inner(E), + Canceled, +} + +impl Balance +where + D: Discover, + D::Service: Service, +{ /// Initializes a P2C load balancer from the provided randomization source. pub fn new(discover: D, rng: SmallRng) -> Self { Self { rng, discover, - endpoints: IndexMap::default(), - ready_index: None, + ready_services: IndexMap::default(), + cancelations: IndexMap::default(), + unready_services: stream::FuturesUnordered::new(), + next_ready_index: None, } } @@ -51,33 +77,81 @@ impl Balance { /// Returns the number of endpoints currently tracked by the balancer. pub fn len(&self) -> usize { - self.endpoints.len() + self.ready_services.len() + self.unready_services.len() } // XXX `pool::Pool` requires direct access to this... Not ideal. pub(crate) fn discover_mut(&mut self) -> &mut D { &mut self.discover } +} +impl Balance +where + D: Discover, + D::Key: Clone, + D::Error: Into, + D::Service: Service + Load, + ::Metric: std::fmt::Debug, + >::Error: Into, +{ /// Polls `discover` for updates, adding new items to `not_ready`. /// /// Removals may alter the order of either `ready` or `not_ready`. - fn poll_discover(&mut self) -> Poll<(), error::Discover> - where - D::Error: Into, - { + fn poll_discover(&mut self) -> Poll<(), error::Discover> { debug!("updating from discover"); - loop { match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { - Change::Insert(key, svc) => drop(self.endpoints.insert(key, svc)), - Change::Remove(rm_key) => { - // Update the ready index to account for reordering of endpoints. - if let Some((rm_idx, _, _)) = self.endpoints.swap_remove_full(&rm_key) { - self.ready_index = self - .ready_index - .and_then(|i| Self::repair_index(i, rm_idx, self.endpoints.len())); - } + Change::Remove(key) => { + trace!("remove"); + self.evict(&key) + } + Change::Insert(key, svc) => { + trace!("insert"); + self.evict(&key); + self.push_unready(key, svc); + } + } + } + } + + fn push_unready(&mut self, key: D::Key, svc: D::Service) { + let (tx, rx) = oneshot::channel(); + self.cancelations.insert(key.clone(), tx); + self.unready_services.push(UnreadyService { + key: Some(key), + ready: Ready::new(svc), + cancel: rx, + }); + } + + fn evict(&mut self, key: &D::Key) { + // Update the ready index to account for reordering of ready. + if let Some((idx, _, _)) = self.ready_services.swap_remove_full(key) { + self.next_ready_index = self + .next_ready_index + .and_then(|i| Self::repair_index(i, idx, self.ready_services.len())); + debug_assert!(!self.cancelations.contains_key(key)); + } else if let Some(cancel) = self.cancelations.remove(key) { + let _ = cancel.send(()); + } + } + + fn poll_unready(&mut self) { + loop { + match self.unready_services.poll() { + Ok(Async::NotReady) | Ok(Async::Ready(None)) => return, + Ok(Async::Ready(Some((key, svc)))) => { + trace!("endpoint ready"); + let _cancel = self.cancelations.remove(&key); + debug_assert!(_cancel.is_some(), "missing cancelation"); + self.ready_services.insert(key, svc); + } + Err((key, Error::Canceled)) => debug_assert!(!self.cancelations.contains_key(&key)), + Err((key, Error::Inner(e))) => { + debug!("dropping failed endpoint: {:?}", e.into()); + let _cancel = self.cancelations.swap_remove(&key); + debug_assert!(_cancel.is_some()); } } } @@ -106,120 +180,86 @@ impl Balance { } /// Performs P2C on inner services to find a suitable endpoint. - /// - /// When this function returns NotReady, `ready_index` is unset. When this - /// function returns Ready, `ready_index` is set with an index into - /// `endpoints` of a ready endpoint service. - /// - /// If `endpoints` is reordered due to removals, `ready_index` is updated via - /// `repair_index()`. - fn poll_ready_index(&mut self) -> Poll - where - D: Discover, - Svc: Service + Load, - Svc::Error: Into, - Svc::Metric: std::fmt::Debug, - { - match self.endpoints.len() { - 0 => Ok(Async::NotReady), - 1 => { - // If there's only one endpoint, ignore its load but require that it - // is ready. - match self.poll_endpoint_index_load(0) { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => { - self.ready_index = Some(0); - Ok(Async::Ready(0)) - } - Err(e) => { - info!("evicting failed endpoint: {}", e.into()); - let _ = self.endpoints.swap_remove_index(0); - Ok(Async::NotReady) - } - } - } + fn p2c_next_ready_index(&mut self) -> Option { + match self.ready_services.len() { + 0 => None, + 1 => Some(0), len => { - // Get two distinct random indexes (in a random order). Poll the - // service at each index. - // - // If either fails, the service is removed from the set of - // endpoints. + // Get two distinct random indexes (in a random order) and + // compare the loads of the service at each index. let idxs = rand::seq::index::sample(&mut self.rng, len, 2); let aidx = idxs.index(0); let bidx = idxs.index(1); debug_assert_ne!(aidx, bidx, "random indices must be distinct"); - let (aload, bidx) = match self.poll_endpoint_index_load(aidx) { - Ok(ready) => (ready, bidx), - Err(e) => { - info!("evicting failed endpoint: {}", e.into()); - let _ = self.endpoints.swap_remove_index(aidx); - let new_bidx = Self::repair_index(bidx, aidx, self.endpoints.len()) - .expect("random indices must be distinct"); - (Async::NotReady, new_bidx) - } - }; + let aload = self.ready_index_load(aidx); + let bload = self.ready_index_load(bidx); + let ready = if aload <= bload { aidx } else { bidx }; - let (bload, aidx) = match self.poll_endpoint_index_load(bidx) { - Ok(ready) => (ready, aidx), - Err(e) => { - info!("evicting failed endpoint: {}", e.into()); - let _ = self.endpoints.swap_remove_index(bidx); - let new_aidx = Self::repair_index(aidx, bidx, self.endpoints.len()) - .expect("random indices must be distinct"); - (Async::NotReady, new_aidx) - } - }; - - trace!("load[{}]={:?}; load[{}]={:?}", aidx, aload, bidx, bload); - - let ready = match (aload, bload) { - (Async::Ready(aload), Async::Ready(bload)) => { - if aload <= bload { - Async::Ready(aidx) - } else { - Async::Ready(bidx) - } - } - (Async::Ready(_), Async::NotReady) => Async::Ready(aidx), - (Async::NotReady, Async::Ready(_)) => Async::Ready(bidx), - (Async::NotReady, Async::NotReady) => Async::NotReady, - }; - trace!(" -> ready={:?}", ready); - Ok(ready) + trace!( + "load[{}]={:?}; load[{}]={:?} -> ready={}", + aidx, + aload, + bidx, + bload, + ready + ); + Some(ready) } } } - /// Accesses an endpoint by index and, if it is ready, returns its current load. - fn poll_endpoint_index_load( - &mut self, - index: usize, - ) -> Poll - where - D: Discover, - Svc: Service + Load, - Svc::Error: Into, - { - let (_, svc) = self.endpoints.get_index_mut(index).expect("invalid index"); - try_ready!(svc.poll_ready()); - Ok(Async::Ready(svc.load())) + /// Accesses a ready endpoint by index and returns its current load. + fn ready_index_load(&self, index: usize) -> ::Metric { + let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); + svc.load() + } + + fn poll_ready_index_or_evict(&mut self, index: usize) -> Poll<(), ()> { + let (_, svc) = self + .ready_services + .get_index_mut(index) + .expect("invalid index"); + + match svc.poll_ready() { + Ok(Async::Ready(())) => Ok(Async::Ready(())), + Ok(Async::NotReady) => { + // became unready; so move it back there. + let (key, svc) = self + .ready_services + .swap_remove_index(index) + .expect("invalid ready index"); + self.push_unready(key, svc); + Ok(Async::NotReady) + } + Err(e) => { + // failed, so drop it. + debug!("evicting failed endpoint: {:?}", e.into()); + self.ready_services + .swap_remove_index(index) + .expect("invalid ready index"); + Err(()) + } + } } } -impl Service for Balance +impl Service for Balance where - D: Discover, + D: Discover, + D::Key: Clone, D::Error: Into, - Svc: Service + Load, - Svc::Error: Into, - Svc::Metric: std::fmt::Debug, + D::Service: Service + Load, + ::Metric: std::fmt::Debug, + >::Error: Into, { - type Response = >::Response; + type Response = >::Response; type Error = error::Error; - type Future = - future::MapErr<>::Future, fn(Svc::Error) -> error::Error>; + type Future = future::MapErr< + >::Future, + fn(>::Error) -> error::Error, + >; /// Prepares the balancer to process a request. /// @@ -230,45 +270,74 @@ where // previously-selected `ready_index` if appropriate. self.poll_discover()?; - if let Some(index) = self.ready_index { - debug_assert!(index < self.endpoints.len()); - // Ensure the selected endpoint is still ready. - match self.poll_endpoint_index_load(index) { - Ok(Async::Ready(_)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => {} - Err(e) => { - drop(self.endpoints.swap_remove_index(index)); - info!("evicting failed endpoint: {}", e.into()); + // Drive new or busy services to readiness. + self.poll_unready(); + trace!( + "ready={}; unready={}", + self.ready_services.len(), + self.unready_services.len() + ); + + loop { + // If a node has already been selected, ensure that it is ready. + // This ensures that the underlying service is ready immediately + // before a request is dispatched to it. If, e.g., a failure + // detector has changed the state of the service, it may be evicted + // from the ready set so that P2C can be performed again. + if let Some(index) = self.next_ready_index { + trace!("preselected ready_index={}", index); + debug_assert!(index < self.ready_services.len()); + + if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { + return Ok(Async::Ready(())); } + + self.next_ready_index = None; } - self.ready_index = None; - } - - let tries = match self.endpoints.len() { - 0 => return Ok(Async::NotReady), - 1 => 1, - n => n / 2, - }; - for _ in 0..tries { - if let Async::Ready(idx) = self.poll_ready_index().map_err(Into::into)? { - trace!("ready: {:?}", idx); - self.ready_index = Some(idx); - return Ok(Async::Ready(())); + self.next_ready_index = self.p2c_next_ready_index(); + if self.next_ready_index.is_none() { + debug_assert!(self.ready_services.is_empty()); + return Ok(Async::NotReady); } } - - trace!("exhausted {} attempts", tries); - Ok(Async::NotReady) } - fn call(&mut self, request: Request) -> Self::Future { - let index = self.ready_index.take().expect("not ready"); - let (_, svc) = self - .endpoints - .get_index_mut(index) + fn call(&mut self, request: Req) -> Self::Future { + let index = self.next_ready_index.take().expect("not ready"); + let (key, mut svc) = self + .ready_services + .swap_remove_index(index) .expect("invalid ready index"); + // no need to repair since the ready_index has been cleared. - svc.call(request).map_err(Into::into) + let fut = svc.call(request); + self.push_unready(key, svc); + + fut.map_err(Into::into) + } +} + +impl, Req> Future for UnreadyService { + type Item = (K, S); + type Error = (K, Error); + + fn poll(&mut self) -> Poll { + if let Ok(Async::Ready(())) = self.cancel.poll() { + let key = self.key.take().expect("polled after ready"); + return Err((key, Error::Canceled)); + } + + match self.ready.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(svc)) => { + let key = self.key.take().expect("polled after ready"); + Ok((key, svc).into()) + } + Err(e) => { + let key = self.key.take().expect("polled after ready"); + Err((key, Error::Inner(e))) + } + } } } diff --git a/tower-balance/src/p2c/test.rs b/tower-balance/src/p2c/test.rs index 5f86f5f..c33dae8 100644 --- a/tower-balance/src/p2c/test.rs +++ b/tower-balance/src/p2c/test.rs @@ -29,7 +29,9 @@ fn empty() { let empty: Vec, usize>> = vec![]; let disco = ServiceList::new(empty); let mut svc = Balance::from_entropy(disco); - assert_not_ready!(svc); + with_task(|| { + assert_not_ready!(svc); + }) } #[test] @@ -64,7 +66,7 @@ fn single_endpoint() { } #[test] -fn two_endpoints_with_equal_weight() { +fn two_endpoints_with_equal_load() { let (mock_a, mut handle_a) = mock::pair(); let (mock_b, mut handle_b) = mock::pair(); let mock_a = load::Constant::new(mock_a, 1); @@ -101,20 +103,21 @@ fn two_endpoints_with_equal_weight() { handle_a.allow(1); handle_b.allow(1); - assert_ready!(svc, "must be ready when both endpoints are ready"); - { + for _ in 0..2 { + assert_ready!(svc, "must be ready when both endpoints are ready"); let fut = svc.call(()); for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] { if let Async::Ready(Some((_, tx))) = h.poll_request().unwrap() { + log::info!("using {}", c); tx.send_response(c); + h.allow(0); } } fut.wait().expect("call must complete"); } handle_a.send_error("endpoint lost"); - handle_b.allow(1); - assert_ready!(svc, "must be ready after one endpoint is removed"); + assert_not_ready!(svc, "must be not be ready"); assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",); }); } diff --git a/tower-balance/src/pool.rs b/tower-balance/src/pool.rs index 71582e3..93ea8f7 100644 --- a/tower-balance/src/pool.rs +++ b/tower-balance/src/pool.rs @@ -238,7 +238,7 @@ where MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, { - balance: Balance>, + balance: Balance, Request>, options: Builder, ewma: f64, } @@ -263,18 +263,18 @@ where } } -impl Service for Pool +impl Service for Pool where - MS: MakeService, + MS: MakeService, MS::Service: Load, ::Metric: std::fmt::Debug, MS::MakeError: ::std::error::Error + Send + Sync + 'static, MS::Error: ::std::error::Error + Send + Sync + 'static, Target: Clone, { - type Response = > as Service>::Response; - type Error = > as Service>::Error; - type Future = > as Service>::Future; + type Response = , Req> as Service>::Response; + type Error = , Req> as Service>::Error; + type Future = , Req> as Service>::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if let Async::Ready(()) = self.balance.poll_ready()? { @@ -318,7 +318,7 @@ where Ok(Async::NotReady) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { self.balance.call(req) } }