Merge remote-tracking branch 'origin/master' into v0.3.x

This commit is contained in:
Lucio Franco 2019-11-26 10:32:02 -05:00
commit 87ad2e1cc8
11 changed files with 579 additions and 41 deletions

View File

@ -11,6 +11,7 @@ members = [
"tower-limit",
"tower-load",
"tower-load-shed",
"tower-ready-cache",
"tower-reconnect",
"tower-retry",
"tower-service",

View File

@ -1,5 +1,9 @@
# Tower
_NOTE_: Tower's [`master`](https://github.com/tower-rs/tower) branch is currently using [`futures 0.1`](https://docs.rs/futures/0.1.28/futures/), please refer to the [`v0.3.x`](https://github.com/tower-rs/tower/tree/v0.3.x) branch to use
any of the new [`std::future::Future`](https://doc.rust-lang.org/std/future/trait.Future.html) based libraries. This will stay in effect until we have fully upgraded
to [`std::future::Future`](https://doc.rust-lang.org/std/future/trait.Future.html).
Tower is a library of modular and reusable components for building robust
networking clients and servers.

View File

@ -1,5 +1,5 @@
use super::BalanceMake;
use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;

View File

@ -1,7 +1,13 @@
use super::Balance;
<<<<<<< HEAD
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy};
=======
use crate::error;
use futures::{try_ready, Future, Poll};
use rand::{rngs::SmallRng, SeedableRng};
>>>>>>> origin/master
use std::marker::PhantomData;
use std::{
future::Future,
@ -49,6 +55,7 @@ where
S: Service<Target>,
S::Response: Discover,
<S::Response as Discover>::Service: Service<Req>,
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
{
type Response = Balance<S::Response, Req>;
type Error = S::Error;

View File

@ -1,4 +1,5 @@
use crate::error;
<<<<<<< HEAD
use futures_core::{ready, Stream};
use futures_util::{stream, try_future, try_future::TryFutureExt};
use indexmap::IndexMap;
@ -12,8 +13,13 @@ use std::{
task::{Context, Poll},
};
use tokio_sync::oneshot;
=======
use futures::{future, Async, Future, Poll};
use rand::{rngs::SmallRng, SeedableRng};
>>>>>>> origin/master
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_ready_cache::{error::Failed, ReadyCache};
use tower_service::Service;
use tracing::{debug, trace};
@ -40,14 +46,8 @@ use tracing::{debug, trace};
pub struct Balance<D: Discover, Req> {
discover: D,
ready_services: IndexMap<D::Key, D::Service>,
unready_services: stream::FuturesUnordered<UnreadyService<D::Key, D::Service, Req>>,
cancelations: IndexMap<D::Key, oneshot::Sender<()>>,
/// Holds an index into `endpoints`, indicating the service that has been
/// chosen to dispatch the next request.
next_ready_index: Option<usize>,
services: ReadyCache<D::Key, D::Service, Req>,
ready_index: Option<usize>,
rng: SmallRng,
@ -94,6 +94,7 @@ impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<error::Error>,
{
/// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self {
@ -116,10 +117,9 @@ where
/// Returns the number of endpoints currently tracked by the balancer.
pub fn len(&self) -> usize {
self.ready_services.len() + self.unready_services.len()
self.services.len()
}
// XXX `pool::Pool` requires direct access to this... Not ideal.
pub(crate) fn discover_mut(&mut self) -> &mut D {
&mut self.discover
}
@ -145,12 +145,13 @@ where
{
Change::Remove(key) => {
trace!("remove");
self.evict(&key)
self.services.evict(&key);
}
Change::Insert(key, svc) => {
Async::Ready(Change::Insert(key, svc)) => {
trace!("insert");
self.evict(&key);
self.push_unready(key, svc);
// If this service already existed in the set, it will be
// replaced as the new one becomes ready.
self.services.push(key, svc);
}
}
}
@ -200,30 +201,16 @@ where
}
}
}
}
// Returns the updated index of `orig_idx` after the entry at `rm_idx` was
// swap-removed from an IndexMap with `orig_sz` items.
//
// If `orig_idx` is the same as `rm_idx`, None is returned to indicate that
// index cannot be repaired.
fn repair_index(orig_idx: usize, rm_idx: usize, new_sz: usize) -> Option<usize> {
debug_assert!(orig_idx <= new_sz && rm_idx <= new_sz);
let repaired = match orig_idx {
i if i == rm_idx => None, // removed
i if i == new_sz => Some(rm_idx), // swapped
i => Some(i), // uneffected
};
trace!(
{ next.idx = orig_idx, removed.idx = rm_idx, length = new_sz, repaired.idx = ?repaired },
"repairing index"
ready = %self.services.ready_len(),
pending = %self.services.pending_len(),
"poll_unready"
);
repaired
}
/// Performs P2C on inner services to find a suitable endpoint.
fn p2c_next_ready_index(&mut self) -> Option<usize> {
match self.ready_services.len() {
fn p2c_ready_index(&mut self) -> Option<usize> {
match self.services.ready_len() {
0 => None,
1 => Some(0),
len => {
@ -237,17 +224,24 @@ where
let aload = self.ready_index_load(aidx);
let bload = self.ready_index_load(bidx);
let ready = if aload <= bload { aidx } else { bidx };
let chosen = if aload <= bload { aidx } else { bidx };
trace!({ a.idx = aidx, a.load = ?aload, b.idx = bidx, b.load = ?bload, ready = ?ready }, "choosing by load");
Some(ready)
trace!(
a.index = aidx,
a.load = ?aload,
b.index = bidx,
b.load = ?bload,
chosen = if chosen == aidx { "a" } else { "b" },
"p2c",
);
Some(chosen)
}
}
}
/// Accesses a ready endpoint by index and returns its current load.
fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
svc.load()
}
@ -315,7 +309,7 @@ where
trace!({ nready = self.ready_services.len(), nunready = self.unready_services.len() }, "poll_ready");
loop {
// If a node has already been selected, ensure that it is ready.
// If a service has already been selected, ensure that it is ready.
// This ensures that the underlying service is ready immediately
// before a request is dispatched to it. If, e.g., a failure
// detector has changed the state of the service, it may be evicted
@ -327,8 +321,6 @@ where
if let Poll::Ready(Ok(())) = self.poll_ready_index_or_evict(cx, index) {
return Poll::Ready(Ok(()));
}
self.next_ready_index = None;
}
self.next_ready_index = self.p2c_next_ready_index();

View File

@ -8,7 +8,11 @@ name = "tower-buffer"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
<<<<<<< HEAD
version = "0.3.0-alpha.2"
=======
version = "0.1.2"
>>>>>>> origin/master
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"

View File

@ -0,0 +1,35 @@
[package]
name = "tower-ready-cache"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-ready-cache/0.1.0"
description = """
Caches a set of services
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
publish = false
[dependencies]
futures = "0.1.26"
indexmap = "1.0.2"
log = "0.4.1"
tokio-sync = "0.1.3"
tower-service = "0.2.0"
tower-util = "0.1.0"
[dev-dependencies]
tokio-executor = "0.1.7"
tower-test = { version = "0.1.0", path = "../tower-test" }

View File

@ -0,0 +1,374 @@
//! A cache of services.
use crate::error;
use futures::{stream, Async, Future, Poll, Stream};
pub use indexmap::Equivalent;
use indexmap::IndexMap;
use log::{debug, trace};
use std::hash::Hash;
use tokio_sync::oneshot;
use tower_service::Service;
/// Drives readiness over a set of services.
///
/// The cache maintains two internal data structures:
///
/// * a set of _pending_ services that have not yet become ready; and
/// * a set of _ready_ services that have previously polled ready.
///
/// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it
/// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked,
/// pending services are polled and added to the _ready set_.
///
/// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a
/// request to the specified service, but panics if the specified service is not
/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
/// that a service is ready before dispatching a request.
///
/// The ready set can hold services for an abitrarily long time. During this
/// time, the runtime may process events that invalidate that ready state (for
/// instance, if a keepalive detects a lost connection). In such cases, callers
/// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`)
/// immediately before dispatching a request to ensure that the service has not
/// become unavailable.
///
/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
/// the _pending_ set to be driven to readiness again.
///
/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
/// specified service is _not_ ready. If an error is returned, this indicats that
/// the server failed nad has been removed from the cache entirely.
///
/// `ReadyCache::evict` can be used to remove a service from the cache (by key),
/// though the service may not be dropped (if it is currently pending) until
/// `ReadyCache::poll_pending` is invoked.
///
/// Note that the by-index accessors are provided to support use cases (like
/// power-of-two-choices load balancing) where the caller does not care to keep
/// track of each service's key. Instead, it needs only to access _some_ ready
/// service. In such a case, it should be noted that calls to
/// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of
/// the ready set, so any cached indexes should be discarded after such a call.
#[derive(Debug)]
pub struct ReadyCache<K, S, Req>
where
K: Eq + Hash,
{
/// A stream of services that are not yet ready.
pending: stream::FuturesUnordered<Pending<K, S, Req>>,
/// An index of cancelation handles for pending streams.
pending_cancel_txs: IndexMap<K, CancelTx>,
/// Services that have previously become ready. Readiness can become stale,
/// so a given service should be polled immediately before use.
///
/// The cancelation oneshot is preserved (though unused) while the service is
/// ready so that it need not be reallocated each time a request is
/// dispatched.
ready: IndexMap<K, (S, CancelPair)>,
}
type CancelRx = oneshot::Receiver<()>;
type CancelTx = oneshot::Sender<()>;
type CancelPair = (CancelTx, CancelRx);
#[derive(Debug)]
enum PendingError<K, E> {
Canceled(K),
Inner(K, E),
}
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
#[derive(Debug)]
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: tower_util::Ready<S, Req>,
}
// === ReadyCache ===
impl<K, S, Req> Default for ReadyCache<K, S, Req>
where
K: Eq + Hash,
S: Service<Req>,
{
fn default() -> Self {
Self {
ready: IndexMap::default(),
pending: stream::FuturesUnordered::new(),
pending_cancel_txs: IndexMap::default(),
}
}
}
impl<K, S, Req> ReadyCache<K, S, Req>
where
K: Eq + Hash,
{
/// Returns the total number of services in the cache.
pub fn len(&self) -> usize {
self.ready_len() + self.pending_len()
}
/// Returns the number of services in the ready set.
pub fn ready_len(&self) -> usize {
self.ready.len()
}
/// Returns the number of services in the unready set.
pub fn pending_len(&self) -> usize {
self.pending.len()
}
/// Returns true iff the given key is in the unready set.
pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
self.pending_cancel_txs.contains_key(key)
}
/// Obtains a reference to a service in the ready set by key.
pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
}
/// Obtains a mutable reference to a service in the ready set by key.
pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
&mut self,
key: &Q,
) -> Option<(usize, &K, &mut S)> {
self.ready
.get_full_mut(key)
.map(|(i, k, v)| (i, k, &mut v.0))
}
/// Obtains a reference to a service in the ready set by index.
pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
}
/// Obtains a mutable reference to a service in the ready set by index.
pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
}
/// Evicts an item from the cache.
///
/// Returns true if a service was marked for eviction.
///
/// Services are dropped from the ready set immediately. Services in the
/// pending set are marked for cancellation, but `ReadyCache::poll_pending`
/// must be called to cause the service to be dropped.
pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
c.send(()).expect("cancel receiver lost");
true
} else {
false
};
self.ready
.swap_remove_full(key)
.map(|_| true)
.unwrap_or(canceled)
}
}
impl<K, S, Req> ReadyCache<K, S, Req>
where
K: Clone + Eq + Hash,
S: Service<Req>,
<S as Service<Req>>::Error: Into<error::Error>,
S::Error: Into<error::Error>,
{
/// Pushes a new service onto the pending set.
///
/// The service will be promoted to the ready set as `poll_pending` is invoked.
///
/// Note that this does **not** remove services from the ready set. Once the
/// old service is used, it will be dropped instead of being added back to
/// the pending set; OR, when the new service becomes ready, it will replace
/// the prior service in the ready set.
pub fn push(&mut self, key: K, svc: S) {
let cancel = oneshot::channel();
self.push_pending(key, svc, cancel);
}
fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
// If there is already a service for this key, cancel it.
c.send(()).expect("cancel receiver lost");
}
self.pending.push(Pending {
key: Some(key),
cancel: Some(cancel_rx),
ready: tower_util::Ready::new(svc),
});
}
/// Polls services pending readiness, adding ready services to the ready set.
///
/// Returns `Async::Ready` when there are no remaining unready services.
/// `poll_pending` should be called again after `push_service` or
/// `call_ready_index` are invoked.
///
/// Failures indicate that an individual pending service failed to become
/// ready (and has been removed from the cache). In such a case,
/// `poll_pending` should typically be called again to continue driving
/// pending services to readiness.
pub fn poll_pending(&mut self) -> Poll<(), error::Failed<K>> {
loop {
match self.pending.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some((key, svc, cancel_rx)))) => {
trace!("endpoint ready");
let cancel_tx = self
.pending_cancel_txs
.swap_remove(&key)
.expect("missing cancelation");
// Keep track of the cancelation so that it need not be
// recreated after the service is used.
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
}
Err(PendingError::Canceled(_)) => {
debug!("endpoint canceled");
// The cancellation for this service was removed in order to
// cause this cancellation.
}
Err(PendingError::Inner(key, e)) => {
self.pending_cancel_txs
.swap_remove(&key)
.expect("missing cancelation");
return Err(error::Failed(key, e.into()));
}
}
}
}
/// Checks whether the referenced endpoint is ready.
///
/// Returns true if the endpoint is ready and false if it is not. An error is
/// returned if the endpoint fails.
pub fn check_ready<Q: Hash + Equivalent<K>>(
&mut self,
key: &Q,
) -> Result<bool, error::Failed<K>> {
match self.ready.get_full_mut(key) {
Some((index, _, _)) => self.check_ready_index(index),
None => Ok(false),
}
}
/// Checks whether the referenced endpoint is ready.
///
/// If the service is no longer ready, it is moved back into the pending set
/// and `false` is returned.
///
/// If the service errors, it is removed and dropped and the error is returned.
pub fn check_ready_index(&mut self, index: usize) -> Result<bool, error::Failed<K>> {
let svc = match self.ready.get_index_mut(index) {
None => return Ok(false),
Some((_, (svc, _))) => svc,
};
match svc.poll_ready() {
Ok(Async::Ready(())) => Ok(true),
Ok(Async::NotReady) => {
// became unready; so move it back there.
let (key, (svc, cancel)) = self
.ready
.swap_remove_index(index)
.expect("invalid ready index");
// If a new version of this service has been added to the
// unready set, don't overwrite it.
if !self.pending_contains(&key) {
self.push_pending(key, svc, cancel);
}
Ok(false)
}
Err(e) => {
// failed, so drop it.
let (key, _) = self
.ready
.swap_remove_index(index)
.expect("invalid ready index");
Err(error::Failed(key, e.into()))
}
}
}
/// Calls a ready service by key.
///
/// # Panics
///
/// If the specified key does not exist in the ready
pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
let (index, _, _) = self
.ready
.get_full_mut(key)
.expect("check_ready was not called");
self.call_ready_index(index, req)
}
/// Calls a ready service by index.
///
/// # Panics
///
/// If the specified index is out of range.
pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
let (key, (mut svc, cancel)) = self
.ready
.swap_remove_index(index)
.expect("check_ready_index was not called");
let fut = svc.call(req);
// If a new version of this service has been added to the
// unready set, don't overwrite it.
if !self.pending_contains(&key) {
self.push_pending(key, svc, cancel);
}
fut
}
}
// === Pending ===
impl<K, S, Req> Future for Pending<K, S, Req>
where
S: Service<Req>,
{
type Item = (K, S, CancelRx);
type Error = PendingError<K, S::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self
.cancel
.as_mut()
.expect("polled after complete")
.poll()
.expect("cancel sender lost")
.is_ready()
{
let key = self.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key));
}
match self.ready.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(svc)) => {
let key = self.key.take().expect("polled after complete");
let cancel = self.cancel.take().expect("polled after complete");
Ok((key, svc, cancel).into())
}
Err(e) => {
let key = self.key.take().expect("polled after compete");
Err(PendingError::Inner(key, e))
}
}
}
}

View File

@ -0,0 +1,28 @@
//! Errors
/// A generic error type.
pub type Error = Box<dyn std::error::Error + Send + Sync>;
/// An error indicating that the service with a `K`-typed key failed with an
/// error.
pub struct Failed<K>(pub K, pub Error);
// === Failed ===
impl<K> std::fmt::Debug for Failed<K> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.1, f)
}
}
impl<K> std::fmt::Display for Failed<K> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.1.fmt(f)
}
}
impl<K> std::error::Error for Failed<K> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.1.source()
}
}

View File

@ -0,0 +1,12 @@
//! A cache of services
#![doc(html_root_url = "https://docs.rs/tower-ready-cache/0.1.0")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
#![deny(warnings)]
pub mod cache;
pub mod error;
pub use self::cache::ReadyCache;

View File

@ -0,0 +1,81 @@
use futures::prelude::*;
use tower_ready_cache::{error, ReadyCache};
use tower_test::mock;
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}
type Req = &'static str;
type Mock = mock::Mock<Req, Req>;
#[test]
fn poll_ready_inner_failure() {
let mut cache = ReadyCache::<usize, Mock, Req>::default();
let (service0, mut handle0) = mock::pair::<Req, Req>();
handle0.send_error("doom");
cache.push(0, service0);
let (service1, mut handle1) = mock::pair::<Req, Req>();
handle1.allow(1);
cache.push(1, service1);
with_task(|| {
let error::Failed(key, err) = cache
.poll_pending()
.err()
.expect("poll_ready should fail when exhausted");
assert_eq!(key, 0);
assert_eq!(format!("{}", err), "doom");
});
assert_eq!(cache.len(), 1);
}
#[test]
fn poll_ready_not_ready() {
let mut cache = ReadyCache::<usize, Mock, Req>::default();
let (service0, mut handle0) = mock::pair::<Req, Req>();
handle0.allow(0);
cache.push(0, service0);
let (service1, mut handle1) = mock::pair::<Req, Req>();
handle1.allow(0);
cache.push(1, service1);
with_task(|| {
assert!(cache.poll_pending().expect("must succeed").is_not_ready());
});
assert_eq!(cache.ready_len(), 0);
assert_eq!(cache.pending_len(), 2);
assert_eq!(cache.len(), 2);
}
#[test]
fn poll_ready_promotes_inner() {
let mut cache = ReadyCache::<usize, Mock, Req>::default();
let (service0, mut handle0) = mock::pair::<Req, Req>();
handle0.allow(1);
cache.push(0, service0);
let (service1, mut handle1) = mock::pair::<Req, Req>();
handle1.allow(1);
cache.push(1, service1);
assert_eq!(cache.ready_len(), 0);
assert_eq!(cache.pending_len(), 2);
assert_eq!(cache.len(), 2);
with_task(|| {
assert!(cache.poll_pending().expect("must succeed").is_ready());
});
assert_eq!(cache.ready_len(), 2);
assert_eq!(cache.pending_len(), 0);
assert_eq!(cache.len(), 2);
}