From 793e2e8e94b5ca6c1d12171b3909d78505ab6667 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 20 Aug 2019 23:20:01 -0400 Subject: [PATCH 1/8] Add a note about v0.3.x branch to the readme (#312) * Add a note about v0.3.x branch to the readme Signed-off-by: Lucio Franco * Fix link Signed-off-by: Lucio Franco --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 5ea8b8f..fbc8cc2 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # Tower +_NOTE_: Tower's [`master`](https://github.com/tower-rs/tower) branch is currently using [`futures 0.1`](https://docs.rs/futures/0.1.28/futures/), please refer to the [`v0.3.x`](https://github.com/tower-rs/tower/tree/v0.3.x) branch to use +any of the new [`std::future::Future`](https://doc.rust-lang.org/std/future/trait.Future.html) based libraries. This will stay in effect until we have fully upgraded +to [`std::future::Future`](https://doc.rust-lang.org/std/future/trait.Future.html). + Tower is a library of modular and reusable components for building robust networking clients and servers. From 8509ab879d791571d48480f7a4b09671b870d59d Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Tue, 17 Sep 2019 14:29:11 -0500 Subject: [PATCH 2/8] Fix up broken dependencies and deprecated methods (#347) * fix up broken dependencies and deprecated methods * use released version of tracing-subscriber Co-Authored-By: Lucio Franco --- tower-balance/Cargo.toml | 2 +- tower-balance/examples/demo.rs | 3 ++- tower-balance/src/p2c/service.rs | 4 ++-- tower-buffer/Cargo.toml | 2 +- tower-spawn-ready/Cargo.toml | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index 4db8f66..ce04340 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -41,7 +41,7 @@ tower-util = "0.1.0" slab = "0.4" [dev-dependencies] -tracing-fmt = { git = "https://github.com/tokio-rs/tracing.git" } +tracing-subscriber = "0.1.3" hdrhistogram = "6.0" quickcheck = { version = "0.6", default-features = false } tokio = "0.1.7" diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index 8d989d9..6286300 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -37,7 +37,8 @@ struct Summary { } fn main() { - tracing::subscriber::set_global_default(tracing_fmt::FmtSubscriber::default()).unwrap(); + tracing::subscriber::set_global_default(tracing_subscriber::fmt::Subscriber::default()) + .unwrap(); println!("REQUESTS={}", REQUESTS); println!("CONCURRENCY={}", CONCURRENCY); diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index b9a9937..91efc5b 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -132,7 +132,7 @@ where .next_ready_index .and_then(|i| Self::repair_index(i, idx, self.ready_services.len())); debug_assert!(!self.cancelations.contains_key(key)); - } else if let Some(cancel) = self.cancelations.remove(key) { + } else if let Some(cancel) = self.cancelations.swap_remove(key) { let _ = cancel.send(()); } } @@ -143,7 +143,7 @@ where Ok(Async::NotReady) | Ok(Async::Ready(None)) => return, Ok(Async::Ready(Some((key, svc)))) => { trace!("endpoint ready"); - let _cancel = self.cancelations.remove(&key); + let _cancel = self.cancelations.swap_remove(&key); debug_assert!(_cancel.is_some(), "missing cancelation"); self.ready_services.insert(key, svc); } diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml index b49f7b1..392bec3 100644 --- a/tower-buffer/Cargo.toml +++ b/tower-buffer/Cargo.toml @@ -30,7 +30,7 @@ futures = "0.1.25" tower-service = "0.2.0" tower-layer = "0.1.0" tokio-executor = "0.1.7" -tokio-sync = "0.1.0" +tokio-sync = "0.1.3" tracing = "0.1.2" [dev-dependencies] diff --git a/tower-spawn-ready/Cargo.toml b/tower-spawn-ready/Cargo.toml index 4c206bd..49aeb22 100644 --- a/tower-spawn-ready/Cargo.toml +++ b/tower-spawn-ready/Cargo.toml @@ -28,7 +28,7 @@ tower-service = "0.2.0" tower-layer = "0.1.0" tower-util = "0.1.0" tokio-executor = "0.1.7" -tokio-sync = "0.1.0" +tokio-sync = "0.1.3" [dev-dependencies] tower = { version = "0.1.0", path = "../tower" } From b86d7fb6e463417c754e1f22febb3d1c512c2dc9 Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Tue, 17 Sep 2019 16:19:23 -0500 Subject: [PATCH 3/8] limit: Add trace log when rate limit is exceeded (#348) --- tower-limit/Cargo.toml | 1 + tower-limit/src/rate/service.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/tower-limit/Cargo.toml b/tower-limit/Cargo.toml index b9b384e..5a830ae 100644 --- a/tower-limit/Cargo.toml +++ b/tower-limit/Cargo.toml @@ -27,6 +27,7 @@ tower-service = "0.2.0" tower-layer = "0.1.0" tokio-sync = "0.1.3" tokio-timer = "0.2.6" +tracing = "0.1.2" [dev-dependencies] tower-test = { version = "0.1", path = "../tower-test" } diff --git a/tower-limit/src/rate/service.rs b/tower-limit/src/rate/service.rs index ef4427e..062a6a7 100644 --- a/tower-limit/src/rate/service.rs +++ b/tower-limit/src/rate/service.rs @@ -95,6 +95,7 @@ where self.state = State::Ready { until, rem }; } else { // The service is disabled until further notice + tracing::trace!("rate limit exceeded, disabling service"); let sleep = Delay::new(until); self.state = State::Limited(sleep); } From 30f11bfaa2f025c48875789dec4c3c95117f5ef5 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 11 Oct 2019 11:22:14 -0400 Subject: [PATCH 4/8] Prepare limit 0.1.1 release (#359) --- tower-limit/CHANGELOG.md | 4 ++++ tower-limit/Cargo.toml | 4 ++-- tower-limit/src/lib.rs | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tower-limit/CHANGELOG.md b/tower-limit/CHANGELOG.md index 24a6a34..e8751df 100644 --- a/tower-limit/CHANGELOG.md +++ b/tower-limit/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.1.1 (October 11, 2019) + +- Added `tracing` events for when requests are limited + # 0.1.0 (April 26, 2019) - Initial release diff --git a/tower-limit/Cargo.toml b/tower-limit/Cargo.toml index 5a830ae..b6265cb 100644 --- a/tower-limit/Cargo.toml +++ b/tower-limit/Cargo.toml @@ -8,13 +8,13 @@ name = "tower-limit" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.1.0" +version = "0.1.1" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" repository = "https://github.com/tower-rs/tower" homepage = "https://github.com/tower-rs/tower" -documentation = "https://docs.rs/tower-limit/0.1.0" +documentation = "https://docs.rs/tower-limit/0.1.1" description = """ Limit maximum request rate to a `Service`. """ diff --git a/tower-limit/src/lib.rs b/tower-limit/src/lib.rs index 97e9fcf..147a372 100644 --- a/tower-limit/src/lib.rs +++ b/tower-limit/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tower-limit/0.1.0")] +#![doc(html_root_url = "https://docs.rs/tower-limit/0.1.1")] #![cfg_attr(test, deny(warnings))] #![deny(missing_debug_implementations, missing_docs, rust_2018_idioms)] #![allow(elided_lifetimes_in_paths)] From e414b2b7d3ce8220aaf321d42af70cf37ec127d0 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 11 Oct 2019 11:39:34 -0400 Subject: [PATCH 5/8] Prepare buffer 0.1.2 release (#360) --- tower-buffer/CHANGELOG.md | 4 ++++ tower-buffer/Cargo.toml | 2 +- tower-buffer/src/lib.rs | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tower-buffer/CHANGELOG.md b/tower-buffer/CHANGELOG.md index 86e7a5a..e69a14a 100644 --- a/tower-buffer/CHANGELOG.md +++ b/tower-buffer/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.1.2 (October 11, 2019) + +- Bump `tokio-sync` to `v0.1.3` + # 0.1.1 (July 19, 2019) - Add `tracing` support diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml index 392bec3..d93786e 100644 --- a/tower-buffer/Cargo.toml +++ b/tower-buffer/Cargo.toml @@ -8,7 +8,7 @@ name = "tower-buffer" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.1.1" +version = "0.1.2" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index 8a68d66..1218686 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tower-buffer/0.1.1")] +#![doc(html_root_url = "https://docs.rs/tower-buffer/0.1.2")] #![deny(rust_2018_idioms)] #![allow(elided_lifetimes_in_paths)] From 4a4593d522bf615b138e18f9fe6fc01e46e7d417 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 9 Nov 2019 14:30:44 -0800 Subject: [PATCH 6/8] balance: Update rand to 0.7 (#363) --- tower-balance/Cargo.toml | 6 +++--- tower-balance/src/p2c/layer.rs | 2 +- tower-balance/src/p2c/make.rs | 2 +- tower-balance/src/p2c/service.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index ce04340..a119f4e 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -29,8 +29,8 @@ default = ["log"] [dependencies] futures = "0.1.26" indexmap = "1.0.2" -tracing = "0.1" -rand = "0.6.5" +rand = { version = "0.7", features = ["small_rng"] } +slab = "0.4" tokio-sync = "0.1.3" tokio-timer = "0.2.4" tower-discover = "0.1.0" @@ -38,7 +38,7 @@ tower-layer = "0.1.0" tower-load = { version = "0.1.0", path = "../tower-load" } tower-service = "0.2.0" tower-util = "0.1.0" -slab = "0.4" +tracing = "0.1.0" [dev-dependencies] tracing-subscriber = "0.1.3" diff --git a/tower-balance/src/p2c/layer.rs b/tower-balance/src/p2c/layer.rs index 7a92c62..a90747a 100644 --- a/tower-balance/src/p2c/layer.rs +++ b/tower-balance/src/p2c/layer.rs @@ -1,5 +1,5 @@ use super::BalanceMake; -use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::{fmt, marker::PhantomData}; use tower_layer::Layer; diff --git a/tower-balance/src/p2c/make.rs b/tower-balance/src/p2c/make.rs index dd3c5be..1940d0b 100644 --- a/tower-balance/src/p2c/make.rs +++ b/tower-balance/src/p2c/make.rs @@ -1,6 +1,6 @@ use super::Balance; use futures::{try_ready, Future, Poll}; -use rand::{rngs::SmallRng, FromEntropy}; +use rand::{rngs::SmallRng, SeedableRng}; use std::marker::PhantomData; use tower_discover::Discover; use tower_service::Service; diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 91efc5b..50c27cd 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -1,7 +1,7 @@ use crate::error; use futures::{future, stream, try_ready, Async, Future, Poll, Stream}; use indexmap::IndexMap; -use rand::{rngs::SmallRng, FromEntropy}; +use rand::{rngs::SmallRng, SeedableRng}; use tokio_sync::oneshot; use tower_discover::{Change, Discover}; use tower_load::Load; From 2d24d84e7c71aa5e224ebf9d9723ef64b3ca7238 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 11 Nov 2019 09:52:33 -0800 Subject: [PATCH 7/8] Cleanup unused dependencies (#364) I've run `cargo udeps` to discover some unused/misplaced dependencies. --- tower-hedge/Cargo.toml | 8 ++++---- tower-layer/Cargo.toml | 4 +--- tower/Cargo.toml | 1 - 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/tower-hedge/Cargo.toml b/tower-hedge/Cargo.toml index 9fcf134..3b54f67 100644 --- a/tower-hedge/Cargo.toml +++ b/tower-hedge/Cargo.toml @@ -8,11 +8,11 @@ publish = false futures = "0.1" hdrhistogram = "6.0" log = "0.4.1" -tower-service = "0.2.0" -tower-filter = { version = "0.1", path = "../tower-filter" } -tokio-mock-task = { git = "https://github.com/carllerche/tokio-mock-task" } tokio-timer = "0.2.6" +tower-filter = { version = "0.1", path = "../tower-filter" } +tower-service = "0.2.0" [dev-dependencies] -tower-test = { version = "0.1", path = "../tower-test" } tokio-executor = "0.1.2" +tokio-mock-task = { git = "https://github.com/carllerche/tokio-mock-task" } +tower-test = { version = "0.1", path = "../tower-test" } diff --git a/tower-layer/Cargo.toml b/tower-layer/Cargo.toml index 10aa5b4..0bc33f1 100644 --- a/tower-layer/Cargo.toml +++ b/tower-layer/Cargo.toml @@ -21,9 +21,7 @@ Decorates a `Service` to allow easy composition between `Service`s. categories = ["asynchronous", "network-programming"] edition = "2018" -[dependencies] +[dev-dependencies] futures = "0.1.26" tower-service = "0.2.0" - -[dev-dependencies] void = "1.0.2" diff --git a/tower/Cargo.toml b/tower/Cargo.toml index f72a8c0..a296d8e 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -43,5 +43,4 @@ tower-util = { version = "0.1.0", features = ["io"] } futures = "0.1.26" log = "0.4.1" tokio = "0.1" -env_logger = { version = "0.5.3", default-features = false } void = "1.0.2" From 7e55b7fa0b2db4ff36fd90f3700bd628c89951b6 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 12 Nov 2019 09:44:16 -0800 Subject: [PATCH 8/8] Introduce tower-ready-cache (#303) In #293, `balance` was refactored to manage dispatching requests over a set of equivalent inner services that may or may not be ready. This change extracts the core logic of managing a cache of ready services into a dedicated crate, leaving the balance crate to deal with node selection. --- Cargo.toml | 1 + tower-balance/Cargo.toml | 1 + tower-balance/src/p2c/make.rs | 3 + tower-balance/src/p2c/service.rs | 277 ++++++------------ tower-ready-cache/Cargo.toml | 35 +++ tower-ready-cache/src/cache.rs | 374 +++++++++++++++++++++++++ tower-ready-cache/src/error.rs | 28 ++ tower-ready-cache/src/lib.rs | 12 + tower-ready-cache/tests/ready_cache.rs | 81 ++++++ 9 files changed, 623 insertions(+), 189 deletions(-) create mode 100644 tower-ready-cache/Cargo.toml create mode 100644 tower-ready-cache/src/cache.rs create mode 100644 tower-ready-cache/src/error.rs create mode 100644 tower-ready-cache/src/lib.rs create mode 100644 tower-ready-cache/tests/ready_cache.rs diff --git a/Cargo.toml b/Cargo.toml index 9b63ae3..5ee0a3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "tower-limit", "tower-load", "tower-load-shed", + "tower-ready-cache", "tower-reconnect", "tower-retry", "tower-service", diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index a119f4e..304c3bf 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -36,6 +36,7 @@ tokio-timer = "0.2.4" tower-discover = "0.1.0" tower-layer = "0.1.0" tower-load = { version = "0.1.0", path = "../tower-load" } +tower-ready-cache = { version = "0.1.0", path = "../tower-ready-cache" } tower-service = "0.2.0" tower-util = "0.1.0" tracing = "0.1.0" diff --git a/tower-balance/src/p2c/make.rs b/tower-balance/src/p2c/make.rs index 1940d0b..5bd0f57 100644 --- a/tower-balance/src/p2c/make.rs +++ b/tower-balance/src/p2c/make.rs @@ -1,4 +1,5 @@ use super::Balance; +use crate::error; use futures::{try_ready, Future, Poll}; use rand::{rngs::SmallRng, SeedableRng}; use std::marker::PhantomData; @@ -40,6 +41,7 @@ where S: Service, S::Response: Discover, ::Service: Service, + <::Service as Service>::Error: Into, { type Response = Balance; type Error = S::Error; @@ -63,6 +65,7 @@ where F: Future, F::Item: Discover, ::Service: Service, + <::Service as Service>::Error: Into, { type Item = Balance; type Error = F::Error; diff --git a/tower-balance/src/p2c/service.rs b/tower-balance/src/p2c/service.rs index 50c27cd..eff3814 100644 --- a/tower-balance/src/p2c/service.rs +++ b/tower-balance/src/p2c/service.rs @@ -1,12 +1,10 @@ use crate::error; -use futures::{future, stream, try_ready, Async, Future, Poll, Stream}; -use indexmap::IndexMap; +use futures::{future, Async, Future, Poll}; use rand::{rngs::SmallRng, SeedableRng}; -use tokio_sync::oneshot; use tower_discover::{Change, Discover}; use tower_load::Load; +use tower_ready_cache::{error::Failed, ReadyCache}; use tower_service::Service; -use tower_util::Ready; use tracing::{debug, trace}; /// Distributes requests across inner services using the [Power of Two Choices][p2c]. @@ -26,47 +24,25 @@ use tracing::{debug, trace}; pub struct Balance { discover: D, - ready_services: IndexMap, - - unready_services: stream::FuturesUnordered>, - cancelations: IndexMap>, - - /// Holds an index into `endpoints`, indicating the service that has been - /// chosen to dispatch the next request. - next_ready_index: Option, + services: ReadyCache, + ready_index: Option, rng: SmallRng, } -/// A Future that becomes satisfied when an `S`-typed service is ready. -/// -/// May fail due to cancelation, i.e. if the service is removed from discovery. -#[derive(Debug)] -struct UnreadyService { - key: Option, - cancel: oneshot::Receiver<()>, - ready: tower_util::Ready, -} - -enum Error { - Inner(E), - Canceled, -} - impl Balance where D: Discover, D::Service: Service, + >::Error: Into, { /// Initializes a P2C load balancer from the provided randomization source. pub fn new(discover: D, rng: SmallRng) -> Self { Self { rng, discover, - ready_services: IndexMap::default(), - cancelations: IndexMap::default(), - unready_services: stream::FuturesUnordered::new(), - next_ready_index: None, + ready_index: None, + services: ReadyCache::default(), } } @@ -77,10 +53,9 @@ where /// Returns the number of endpoints currently tracked by the balancer. pub fn len(&self) -> usize { - self.ready_services.len() + self.unready_services.len() + self.services.len() } - // XXX `pool::Pool` requires direct access to this... Not ideal. pub(crate) fn discover_mut(&mut self) -> &mut D { &mut self.discover } @@ -98,88 +73,59 @@ where /// Polls `discover` for updates, adding new items to `not_ready`. /// /// Removals may alter the order of either `ready` or `not_ready`. - fn poll_discover(&mut self) -> Poll<(), error::Discover> { + fn update_pending_from_discover(&mut self) -> Result<(), error::Discover> { debug!("updating from discover"); loop { - match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) { - Change::Remove(key) => { + match self + .discover + .poll() + .map_err(|e| error::Discover(e.into()))? + { + Async::NotReady => return Ok(()), + Async::Ready(Change::Remove(key)) => { trace!("remove"); - self.evict(&key) + self.services.evict(&key); } - Change::Insert(key, svc) => { + Async::Ready(Change::Insert(key, svc)) => { trace!("insert"); - self.evict(&key); - self.push_unready(key, svc); + // If this service already existed in the set, it will be + // replaced as the new one becomes ready. + self.services.push(key, svc); } } } } - fn push_unready(&mut self, key: D::Key, svc: D::Service) { - let (tx, rx) = oneshot::channel(); - self.cancelations.insert(key.clone(), tx); - self.unready_services.push(UnreadyService { - key: Some(key), - ready: Ready::new(svc), - cancel: rx, - }); - } - - fn evict(&mut self, key: &D::Key) { - // Update the ready index to account for reordering of ready. - if let Some((idx, _, _)) = self.ready_services.swap_remove_full(key) { - self.next_ready_index = self - .next_ready_index - .and_then(|i| Self::repair_index(i, idx, self.ready_services.len())); - debug_assert!(!self.cancelations.contains_key(key)); - } else if let Some(cancel) = self.cancelations.swap_remove(key) { - let _ = cancel.send(()); - } - } - - fn poll_unready(&mut self) { + fn promote_pending_to_ready(&mut self) { loop { - match self.unready_services.poll() { - Ok(Async::NotReady) | Ok(Async::Ready(None)) => return, - Ok(Async::Ready(Some((key, svc)))) => { - trace!("endpoint ready"); - let _cancel = self.cancelations.swap_remove(&key); - debug_assert!(_cancel.is_some(), "missing cancelation"); - self.ready_services.insert(key, svc); + match self.services.poll_pending() { + Ok(Async::Ready(())) => { + // There are no remaining pending services. + debug_assert_eq!(self.services.pending_len(), 0); + break; } - Err((key, Error::Canceled)) => debug_assert!(!self.cancelations.contains_key(&key)), - Err((key, Error::Inner(e))) => { - let error = e.into(); - debug!({ %error }, "dropping failed endpoint"); - let _cancel = self.cancelations.swap_remove(&key); - debug_assert!(_cancel.is_some()); + Ok(Async::NotReady) => { + // None of the pending services are ready. + debug_assert!(self.services.pending_len() > 0); + break; + } + Err(error) => { + // An individual service was lost; continue processing + // pending services. + debug!(%error, "dropping failed endpoint"); } } } - } - - // Returns the updated index of `orig_idx` after the entry at `rm_idx` was - // swap-removed from an IndexMap with `orig_sz` items. - // - // If `orig_idx` is the same as `rm_idx`, None is returned to indicate that - // index cannot be repaired. - fn repair_index(orig_idx: usize, rm_idx: usize, new_sz: usize) -> Option { - debug_assert!(orig_idx <= new_sz && rm_idx <= new_sz); - let repaired = match orig_idx { - i if i == rm_idx => None, // removed - i if i == new_sz => Some(rm_idx), // swapped - i => Some(i), // uneffected - }; trace!( - { next.idx = orig_idx, removed.idx = rm_idx, length = new_sz, repaired.idx = ?repaired }, - "repairing index" + ready = %self.services.ready_len(), + pending = %self.services.pending_len(), + "poll_unready" ); - repaired } /// Performs P2C on inner services to find a suitable endpoint. - fn p2c_next_ready_index(&mut self) -> Option { - match self.ready_services.len() { + fn p2c_ready_index(&mut self) -> Option { + match self.services.ready_len() { 0 => None, 1 => Some(0), len => { @@ -193,48 +139,26 @@ where let aload = self.ready_index_load(aidx); let bload = self.ready_index_load(bidx); - let ready = if aload <= bload { aidx } else { bidx }; + let chosen = if aload <= bload { aidx } else { bidx }; - trace!({ a.idx = aidx, a.load = ?aload, b.idx = bidx, b.load = ?bload, ready = ?ready }, "choosing by load"); - Some(ready) + trace!( + a.index = aidx, + a.load = ?aload, + b.index = bidx, + b.load = ?bload, + chosen = if chosen == aidx { "a" } else { "b" }, + "p2c", + ); + Some(chosen) } } } /// Accesses a ready endpoint by index and returns its current load. fn ready_index_load(&self, index: usize) -> ::Metric { - let (_, svc) = self.ready_services.get_index(index).expect("invalid index"); + let (_, svc) = self.services.get_ready_index(index).expect("invalid index"); svc.load() } - - fn poll_ready_index_or_evict(&mut self, index: usize) -> Poll<(), ()> { - let (_, svc) = self - .ready_services - .get_index_mut(index) - .expect("invalid index"); - - match svc.poll_ready() { - Ok(Async::Ready(())) => Ok(Async::Ready(())), - Ok(Async::NotReady) => { - // became unready; so move it back there. - let (key, svc) = self - .ready_services - .swap_remove_index(index) - .expect("invalid ready index"); - self.push_unready(key, svc); - Ok(Async::NotReady) - } - Err(e) => { - // failed, so drop it. - let error = e.into(); - debug!({ %error }, "evicting failed endpoint"); - self.ready_services - .swap_remove_index(index) - .expect("invalid ready index"); - Err(()) - } - } - } } impl Service for Balance @@ -253,79 +177,54 @@ where fn(>::Error) -> error::Error, >; - /// Prepares the balancer to process a request. - /// - /// When `Async::Ready` is returned, `ready_index` is set with a valid index - /// into `ready` referring to a `Service` that is ready to disptach a request. fn poll_ready(&mut self) -> Poll<(), Self::Error> { - // First and foremost, process discovery updates. This removes or updates a - // previously-selected `ready_index` if appropriate. - self.poll_discover()?; - - // Drive new or busy services to readiness. - self.poll_unready(); - trace!({ nready = self.ready_services.len(), nunready = self.unready_services.len() }, "poll_ready"); + // `ready_index` may have already been set by a prior invocation. These + // updates cannot disturb the order of existing ready services. + self.update_pending_from_discover()?; + self.promote_pending_to_ready(); loop { - // If a node has already been selected, ensure that it is ready. + // If a service has already been selected, ensure that it is ready. // This ensures that the underlying service is ready immediately - // before a request is dispatched to it. If, e.g., a failure - // detector has changed the state of the service, it may be evicted - // from the ready set so that P2C can be performed again. - if let Some(index) = self.next_ready_index { - trace!({ next.idx = index }, "preselected ready_index"); - debug_assert!(index < self.ready_services.len()); - - if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { - return Ok(Async::Ready(())); + // before a request is dispatched to it (i.e. in the same task + // invocation). If, e.g., a failure detector has changed the state + // of the service, it may be evicted from the ready set so that + // another service can be selected. + if let Some(index) = self.ready_index.take() { + match self.services.check_ready_index(index) { + Ok(true) => { + // The service remains ready. + self.ready_index = Some(index); + return Ok(Async::Ready(())); + } + Ok(false) => { + // The service is no longer ready. Try to find a new one. + trace!("ready service became unavailable"); + } + Err(Failed(_, error)) => { + // The ready endpoint failed, so log the error and try + // to find a new one. + debug!(%error, "endpoint failed"); + } } - - self.next_ready_index = None; } - self.next_ready_index = self.p2c_next_ready_index(); - if self.next_ready_index.is_none() { - debug_assert!(self.ready_services.is_empty()); + // Select a new service by comparing two at random and using the + // lesser-loaded service. + self.ready_index = self.p2c_ready_index(); + if self.ready_index.is_none() { + debug_assert_eq!(self.services.ready_len(), 0); + // We have previously registered interest in updates from + // discover and pending services. return Ok(Async::NotReady); } } } fn call(&mut self, request: Req) -> Self::Future { - let index = self.next_ready_index.take().expect("not ready"); - let (key, mut svc) = self - .ready_services - .swap_remove_index(index) - .expect("invalid ready index"); - // no need to repair since the ready_index has been cleared. - - let fut = svc.call(request); - self.push_unready(key, svc); - - fut.map_err(Into::into) - } -} - -impl, Req> Future for UnreadyService { - type Item = (K, S); - type Error = (K, Error); - - fn poll(&mut self) -> Poll { - if let Ok(Async::Ready(())) = self.cancel.poll() { - let key = self.key.take().expect("polled after ready"); - return Err((key, Error::Canceled)); - } - - match self.ready.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(svc)) => { - let key = self.key.take().expect("polled after ready"); - Ok((key, svc).into()) - } - Err(e) => { - let key = self.key.take().expect("polled after ready"); - Err((key, Error::Inner(e))) - } - } + let index = self.ready_index.take().expect("called before ready"); + self.services + .call_ready_index(index, request) + .map_err(Into::into) } } diff --git a/tower-ready-cache/Cargo.toml b/tower-ready-cache/Cargo.toml new file mode 100644 index 0000000..4403bde --- /dev/null +++ b/tower-ready-cache/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "tower-ready-cache" +# When releasing to crates.io: +# - Remove path dependencies +# - Update html_root_url. +# - Update doc url +# - Cargo.toml +# - README.md +# - Update CHANGELOG.md. +# - Create "v0.1.x" git tag. +version = "0.1.0" +authors = ["Tower Maintainers "] +license = "MIT" +readme = "README.md" +repository = "https://github.com/tower-rs/tower" +homepage = "https://github.com/tower-rs/tower" +documentation = "https://docs.rs/tower-ready-cache/0.1.0" +description = """ +Caches a set of services +""" +categories = ["asynchronous", "network-programming"] +edition = "2018" +publish = false + +[dependencies] +futures = "0.1.26" +indexmap = "1.0.2" +log = "0.4.1" +tokio-sync = "0.1.3" +tower-service = "0.2.0" +tower-util = "0.1.0" + +[dev-dependencies] +tokio-executor = "0.1.7" +tower-test = { version = "0.1.0", path = "../tower-test" } diff --git a/tower-ready-cache/src/cache.rs b/tower-ready-cache/src/cache.rs new file mode 100644 index 0000000..9236dcc --- /dev/null +++ b/tower-ready-cache/src/cache.rs @@ -0,0 +1,374 @@ +//! A cache of services. + +use crate::error; +use futures::{stream, Async, Future, Poll, Stream}; +pub use indexmap::Equivalent; +use indexmap::IndexMap; +use log::{debug, trace}; +use std::hash::Hash; +use tokio_sync::oneshot; +use tower_service::Service; + +/// Drives readiness over a set of services. +/// +/// The cache maintains two internal data structures: +/// +/// * a set of _pending_ services that have not yet become ready; and +/// * a set of _ready_ services that have previously polled ready. +/// +/// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it +/// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked, +/// pending services are polled and added to the _ready set_. +/// +/// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a +/// request to the specified service, but panics if the specified service is not +/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure +/// that a service is ready before dispatching a request. +/// +/// The ready set can hold services for an abitrarily long time. During this +/// time, the runtime may process events that invalidate that ready state (for +/// instance, if a keepalive detects a lost connection). In such cases, callers +/// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`) +/// immediately before dispatching a request to ensure that the service has not +/// become unavailable. +/// +/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into +/// the _pending_ set to be driven to readiness again. +/// +/// When `ReadyCache::check_ready*` returns `false`, it indicates that the +/// specified service is _not_ ready. If an error is returned, this indicats that +/// the server failed nad has been removed from the cache entirely. +/// +/// `ReadyCache::evict` can be used to remove a service from the cache (by key), +/// though the service may not be dropped (if it is currently pending) until +/// `ReadyCache::poll_pending` is invoked. +/// +/// Note that the by-index accessors are provided to support use cases (like +/// power-of-two-choices load balancing) where the caller does not care to keep +/// track of each service's key. Instead, it needs only to access _some_ ready +/// service. In such a case, it should be noted that calls to +/// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of +/// the ready set, so any cached indexes should be discarded after such a call. +#[derive(Debug)] +pub struct ReadyCache +where + K: Eq + Hash, +{ + /// A stream of services that are not yet ready. + pending: stream::FuturesUnordered>, + /// An index of cancelation handles for pending streams. + pending_cancel_txs: IndexMap, + + /// Services that have previously become ready. Readiness can become stale, + /// so a given service should be polled immediately before use. + /// + /// The cancelation oneshot is preserved (though unused) while the service is + /// ready so that it need not be reallocated each time a request is + /// dispatched. + ready: IndexMap, +} + +type CancelRx = oneshot::Receiver<()>; +type CancelTx = oneshot::Sender<()>; +type CancelPair = (CancelTx, CancelRx); + +#[derive(Debug)] +enum PendingError { + Canceled(K), + Inner(K, E), +} + +/// A Future that becomes satisfied when an `S`-typed service is ready. +/// +/// May fail due to cancelation, i.e. if the service is evicted from the balancer. +#[derive(Debug)] +struct Pending { + key: Option, + cancel: Option, + ready: tower_util::Ready, +} + +// === ReadyCache === + +impl Default for ReadyCache +where + K: Eq + Hash, + S: Service, +{ + fn default() -> Self { + Self { + ready: IndexMap::default(), + pending: stream::FuturesUnordered::new(), + pending_cancel_txs: IndexMap::default(), + } + } +} + +impl ReadyCache +where + K: Eq + Hash, +{ + /// Returns the total number of services in the cache. + pub fn len(&self) -> usize { + self.ready_len() + self.pending_len() + } + + /// Returns the number of services in the ready set. + pub fn ready_len(&self) -> usize { + self.ready.len() + } + + /// Returns the number of services in the unready set. + pub fn pending_len(&self) -> usize { + self.pending.len() + } + + /// Returns true iff the given key is in the unready set. + pub fn pending_contains>(&self, key: &Q) -> bool { + self.pending_cancel_txs.contains_key(key) + } + + /// Obtains a reference to a service in the ready set by key. + pub fn get_ready>(&self, key: &Q) -> Option<(usize, &K, &S)> { + self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0)) + } + + /// Obtains a mutable reference to a service in the ready set by key. + pub fn get_ready_mut>( + &mut self, + key: &Q, + ) -> Option<(usize, &K, &mut S)> { + self.ready + .get_full_mut(key) + .map(|(i, k, v)| (i, k, &mut v.0)) + } + + /// Obtains a reference to a service in the ready set by index. + pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> { + self.ready.get_index(idx).map(|(k, v)| (k, &v.0)) + } + + /// Obtains a mutable reference to a service in the ready set by index. + pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> { + self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0)) + } + + /// Evicts an item from the cache. + /// + /// Returns true if a service was marked for eviction. + /// + /// Services are dropped from the ready set immediately. Services in the + /// pending set are marked for cancellation, but `ReadyCache::poll_pending` + /// must be called to cause the service to be dropped. + pub fn evict>(&mut self, key: &Q) -> bool { + let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) { + c.send(()).expect("cancel receiver lost"); + true + } else { + false + }; + + self.ready + .swap_remove_full(key) + .map(|_| true) + .unwrap_or(canceled) + } +} + +impl ReadyCache +where + K: Clone + Eq + Hash, + S: Service, + >::Error: Into, + S::Error: Into, +{ + /// Pushes a new service onto the pending set. + /// + /// The service will be promoted to the ready set as `poll_pending` is invoked. + /// + /// Note that this does **not** remove services from the ready set. Once the + /// old service is used, it will be dropped instead of being added back to + /// the pending set; OR, when the new service becomes ready, it will replace + /// the prior service in the ready set. + pub fn push(&mut self, key: K, svc: S) { + let cancel = oneshot::channel(); + self.push_pending(key, svc, cancel); + } + + fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) { + if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) { + // If there is already a service for this key, cancel it. + c.send(()).expect("cancel receiver lost"); + } + self.pending.push(Pending { + key: Some(key), + cancel: Some(cancel_rx), + ready: tower_util::Ready::new(svc), + }); + } + + /// Polls services pending readiness, adding ready services to the ready set. + /// + /// Returns `Async::Ready` when there are no remaining unready services. + /// `poll_pending` should be called again after `push_service` or + /// `call_ready_index` are invoked. + /// + /// Failures indicate that an individual pending service failed to become + /// ready (and has been removed from the cache). In such a case, + /// `poll_pending` should typically be called again to continue driving + /// pending services to readiness. + pub fn poll_pending(&mut self) -> Poll<(), error::Failed> { + loop { + match self.pending.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some((key, svc, cancel_rx)))) => { + trace!("endpoint ready"); + let cancel_tx = self + .pending_cancel_txs + .swap_remove(&key) + .expect("missing cancelation"); + // Keep track of the cancelation so that it need not be + // recreated after the service is used. + self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); + } + Err(PendingError::Canceled(_)) => { + debug!("endpoint canceled"); + // The cancellation for this service was removed in order to + // cause this cancellation. + } + Err(PendingError::Inner(key, e)) => { + self.pending_cancel_txs + .swap_remove(&key) + .expect("missing cancelation"); + return Err(error::Failed(key, e.into())); + } + } + } + } + + /// Checks whether the referenced endpoint is ready. + /// + /// Returns true if the endpoint is ready and false if it is not. An error is + /// returned if the endpoint fails. + pub fn check_ready>( + &mut self, + key: &Q, + ) -> Result> { + match self.ready.get_full_mut(key) { + Some((index, _, _)) => self.check_ready_index(index), + None => Ok(false), + } + } + + /// Checks whether the referenced endpoint is ready. + /// + /// If the service is no longer ready, it is moved back into the pending set + /// and `false` is returned. + /// + /// If the service errors, it is removed and dropped and the error is returned. + pub fn check_ready_index(&mut self, index: usize) -> Result> { + let svc = match self.ready.get_index_mut(index) { + None => return Ok(false), + Some((_, (svc, _))) => svc, + }; + match svc.poll_ready() { + Ok(Async::Ready(())) => Ok(true), + Ok(Async::NotReady) => { + // became unready; so move it back there. + let (key, (svc, cancel)) = self + .ready + .swap_remove_index(index) + .expect("invalid ready index"); + + // If a new version of this service has been added to the + // unready set, don't overwrite it. + if !self.pending_contains(&key) { + self.push_pending(key, svc, cancel); + } + + Ok(false) + } + Err(e) => { + // failed, so drop it. + let (key, _) = self + .ready + .swap_remove_index(index) + .expect("invalid ready index"); + Err(error::Failed(key, e.into())) + } + } + } + + /// Calls a ready service by key. + /// + /// # Panics + /// + /// If the specified key does not exist in the ready + pub fn call_ready>(&mut self, key: &Q, req: Req) -> S::Future { + let (index, _, _) = self + .ready + .get_full_mut(key) + .expect("check_ready was not called"); + self.call_ready_index(index, req) + } + + /// Calls a ready service by index. + /// + /// # Panics + /// + /// If the specified index is out of range. + pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future { + let (key, (mut svc, cancel)) = self + .ready + .swap_remove_index(index) + .expect("check_ready_index was not called"); + + let fut = svc.call(req); + + // If a new version of this service has been added to the + // unready set, don't overwrite it. + if !self.pending_contains(&key) { + self.push_pending(key, svc, cancel); + } + + fut + } +} + +// === Pending === + +impl Future for Pending +where + S: Service, +{ + type Item = (K, S, CancelRx); + type Error = PendingError; + + fn poll(&mut self) -> Poll { + if self + .cancel + .as_mut() + .expect("polled after complete") + .poll() + .expect("cancel sender lost") + .is_ready() + { + let key = self.key.take().expect("polled after complete"); + return Err(PendingError::Canceled(key)); + } + + match self.ready.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(svc)) => { + let key = self.key.take().expect("polled after complete"); + let cancel = self.cancel.take().expect("polled after complete"); + Ok((key, svc, cancel).into()) + } + Err(e) => { + let key = self.key.take().expect("polled after compete"); + Err(PendingError::Inner(key, e)) + } + } + } +} diff --git a/tower-ready-cache/src/error.rs b/tower-ready-cache/src/error.rs new file mode 100644 index 0000000..72804d0 --- /dev/null +++ b/tower-ready-cache/src/error.rs @@ -0,0 +1,28 @@ +//! Errors + +/// A generic error type. +pub type Error = Box; + +/// An error indicating that the service with a `K`-typed key failed with an +/// error. +pub struct Failed(pub K, pub Error); + +// === Failed === + +impl std::fmt::Debug for Failed { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + std::fmt::Debug::fmt(&self.1, f) + } +} + +impl std::fmt::Display for Failed { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.1.fmt(f) + } +} + +impl std::error::Error for Failed { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.1.source() + } +} diff --git a/tower-ready-cache/src/lib.rs b/tower-ready-cache/src/lib.rs new file mode 100644 index 0000000..0792404 --- /dev/null +++ b/tower-ready-cache/src/lib.rs @@ -0,0 +1,12 @@ +//! A cache of services + +#![doc(html_root_url = "https://docs.rs/tower-ready-cache/0.1.0")] +#![deny(missing_docs)] +#![deny(rust_2018_idioms)] +#![allow(elided_lifetimes_in_paths)] +#![deny(warnings)] + +pub mod cache; +pub mod error; + +pub use self::cache::ReadyCache; diff --git a/tower-ready-cache/tests/ready_cache.rs b/tower-ready-cache/tests/ready_cache.rs new file mode 100644 index 0000000..c172319 --- /dev/null +++ b/tower-ready-cache/tests/ready_cache.rs @@ -0,0 +1,81 @@ +use futures::prelude::*; +use tower_ready_cache::{error, ReadyCache}; +use tower_test::mock; + +fn with_task U, U>(f: F) -> U { + use futures::future::lazy; + lazy(|| Ok::<_, ()>(f())).wait().unwrap() +} + +type Req = &'static str; +type Mock = mock::Mock; + +#[test] +fn poll_ready_inner_failure() { + let mut cache = ReadyCache::::default(); + + let (service0, mut handle0) = mock::pair::(); + handle0.send_error("doom"); + cache.push(0, service0); + + let (service1, mut handle1) = mock::pair::(); + handle1.allow(1); + cache.push(1, service1); + + with_task(|| { + let error::Failed(key, err) = cache + .poll_pending() + .err() + .expect("poll_ready should fail when exhausted"); + assert_eq!(key, 0); + assert_eq!(format!("{}", err), "doom"); + }); + + assert_eq!(cache.len(), 1); +} + +#[test] +fn poll_ready_not_ready() { + let mut cache = ReadyCache::::default(); + + let (service0, mut handle0) = mock::pair::(); + handle0.allow(0); + cache.push(0, service0); + + let (service1, mut handle1) = mock::pair::(); + handle1.allow(0); + cache.push(1, service1); + + with_task(|| { + assert!(cache.poll_pending().expect("must succeed").is_not_ready()); + }); + + assert_eq!(cache.ready_len(), 0); + assert_eq!(cache.pending_len(), 2); + assert_eq!(cache.len(), 2); +} + +#[test] +fn poll_ready_promotes_inner() { + let mut cache = ReadyCache::::default(); + + let (service0, mut handle0) = mock::pair::(); + handle0.allow(1); + cache.push(0, service0); + + let (service1, mut handle1) = mock::pair::(); + handle1.allow(1); + cache.push(1, service1); + + assert_eq!(cache.ready_len(), 0); + assert_eq!(cache.pending_len(), 2); + assert_eq!(cache.len(), 2); + + with_task(|| { + assert!(cache.poll_pending().expect("must succeed").is_ready()); + }); + + assert_eq!(cache.ready_len(), 2); + assert_eq!(cache.pending_len(), 0); + assert_eq!(cache.len(), 2); +}