Introduce tower-ready-cache (#303)
In #293, `balance` was refactored to manage dispatching requests over a set of equivalent inner services that may or may not be ready. This change extracts the core logic of managing a cache of ready services into a dedicated crate, leaving the balance crate to deal with node selection.
This commit is contained in:
parent
2d24d84e7c
commit
7e55b7fa0b
|
@ -11,6 +11,7 @@ members = [
|
|||
"tower-limit",
|
||||
"tower-load",
|
||||
"tower-load-shed",
|
||||
"tower-ready-cache",
|
||||
"tower-reconnect",
|
||||
"tower-retry",
|
||||
"tower-service",
|
||||
|
|
|
@ -36,6 +36,7 @@ tokio-timer = "0.2.4"
|
|||
tower-discover = "0.1.0"
|
||||
tower-layer = "0.1.0"
|
||||
tower-load = { version = "0.1.0", path = "../tower-load" }
|
||||
tower-ready-cache = { version = "0.1.0", path = "../tower-ready-cache" }
|
||||
tower-service = "0.2.0"
|
||||
tower-util = "0.1.0"
|
||||
tracing = "0.1.0"
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use super::Balance;
|
||||
use crate::error;
|
||||
use futures::{try_ready, Future, Poll};
|
||||
use rand::{rngs::SmallRng, SeedableRng};
|
||||
use std::marker::PhantomData;
|
||||
|
@ -40,6 +41,7 @@ where
|
|||
S: Service<Target>,
|
||||
S::Response: Discover,
|
||||
<S::Response as Discover>::Service: Service<Req>,
|
||||
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
|
||||
{
|
||||
type Response = Balance<S::Response, Req>;
|
||||
type Error = S::Error;
|
||||
|
@ -63,6 +65,7 @@ where
|
|||
F: Future,
|
||||
F::Item: Discover,
|
||||
<F::Item as Discover>::Service: Service<Req>,
|
||||
<<F::Item as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
|
||||
{
|
||||
type Item = Balance<F::Item, Req>;
|
||||
type Error = F::Error;
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
use crate::error;
|
||||
use futures::{future, stream, try_ready, Async, Future, Poll, Stream};
|
||||
use indexmap::IndexMap;
|
||||
use futures::{future, Async, Future, Poll};
|
||||
use rand::{rngs::SmallRng, SeedableRng};
|
||||
use tokio_sync::oneshot;
|
||||
use tower_discover::{Change, Discover};
|
||||
use tower_load::Load;
|
||||
use tower_ready_cache::{error::Failed, ReadyCache};
|
||||
use tower_service::Service;
|
||||
use tower_util::Ready;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
/// Distributes requests across inner services using the [Power of Two Choices][p2c].
|
||||
|
@ -26,47 +24,25 @@ use tracing::{debug, trace};
|
|||
pub struct Balance<D: Discover, Req> {
|
||||
discover: D,
|
||||
|
||||
ready_services: IndexMap<D::Key, D::Service>,
|
||||
|
||||
unready_services: stream::FuturesUnordered<UnreadyService<D::Key, D::Service, Req>>,
|
||||
cancelations: IndexMap<D::Key, oneshot::Sender<()>>,
|
||||
|
||||
/// Holds an index into `endpoints`, indicating the service that has been
|
||||
/// chosen to dispatch the next request.
|
||||
next_ready_index: Option<usize>,
|
||||
services: ReadyCache<D::Key, D::Service, Req>,
|
||||
ready_index: Option<usize>,
|
||||
|
||||
rng: SmallRng,
|
||||
}
|
||||
|
||||
/// A Future that becomes satisfied when an `S`-typed service is ready.
|
||||
///
|
||||
/// May fail due to cancelation, i.e. if the service is removed from discovery.
|
||||
#[derive(Debug)]
|
||||
struct UnreadyService<K, S, Req> {
|
||||
key: Option<K>,
|
||||
cancel: oneshot::Receiver<()>,
|
||||
ready: tower_util::Ready<S, Req>,
|
||||
}
|
||||
|
||||
enum Error<E> {
|
||||
Inner(E),
|
||||
Canceled,
|
||||
}
|
||||
|
||||
impl<D, Req> Balance<D, Req>
|
||||
where
|
||||
D: Discover,
|
||||
D::Service: Service<Req>,
|
||||
<D::Service as Service<Req>>::Error: Into<error::Error>,
|
||||
{
|
||||
/// Initializes a P2C load balancer from the provided randomization source.
|
||||
pub fn new(discover: D, rng: SmallRng) -> Self {
|
||||
Self {
|
||||
rng,
|
||||
discover,
|
||||
ready_services: IndexMap::default(),
|
||||
cancelations: IndexMap::default(),
|
||||
unready_services: stream::FuturesUnordered::new(),
|
||||
next_ready_index: None,
|
||||
ready_index: None,
|
||||
services: ReadyCache::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -77,10 +53,9 @@ where
|
|||
|
||||
/// Returns the number of endpoints currently tracked by the balancer.
|
||||
pub fn len(&self) -> usize {
|
||||
self.ready_services.len() + self.unready_services.len()
|
||||
self.services.len()
|
||||
}
|
||||
|
||||
// XXX `pool::Pool` requires direct access to this... Not ideal.
|
||||
pub(crate) fn discover_mut(&mut self) -> &mut D {
|
||||
&mut self.discover
|
||||
}
|
||||
|
@ -98,88 +73,59 @@ where
|
|||
/// Polls `discover` for updates, adding new items to `not_ready`.
|
||||
///
|
||||
/// Removals may alter the order of either `ready` or `not_ready`.
|
||||
fn poll_discover(&mut self) -> Poll<(), error::Discover> {
|
||||
fn update_pending_from_discover(&mut self) -> Result<(), error::Discover> {
|
||||
debug!("updating from discover");
|
||||
loop {
|
||||
match try_ready!(self.discover.poll().map_err(|e| error::Discover(e.into()))) {
|
||||
Change::Remove(key) => {
|
||||
match self
|
||||
.discover
|
||||
.poll()
|
||||
.map_err(|e| error::Discover(e.into()))?
|
||||
{
|
||||
Async::NotReady => return Ok(()),
|
||||
Async::Ready(Change::Remove(key)) => {
|
||||
trace!("remove");
|
||||
self.evict(&key)
|
||||
self.services.evict(&key);
|
||||
}
|
||||
Change::Insert(key, svc) => {
|
||||
Async::Ready(Change::Insert(key, svc)) => {
|
||||
trace!("insert");
|
||||
self.evict(&key);
|
||||
self.push_unready(key, svc);
|
||||
// If this service already existed in the set, it will be
|
||||
// replaced as the new one becomes ready.
|
||||
self.services.push(key, svc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.cancelations.insert(key.clone(), tx);
|
||||
self.unready_services.push(UnreadyService {
|
||||
key: Some(key),
|
||||
ready: Ready::new(svc),
|
||||
cancel: rx,
|
||||
});
|
||||
}
|
||||
|
||||
fn evict(&mut self, key: &D::Key) {
|
||||
// Update the ready index to account for reordering of ready.
|
||||
if let Some((idx, _, _)) = self.ready_services.swap_remove_full(key) {
|
||||
self.next_ready_index = self
|
||||
.next_ready_index
|
||||
.and_then(|i| Self::repair_index(i, idx, self.ready_services.len()));
|
||||
debug_assert!(!self.cancelations.contains_key(key));
|
||||
} else if let Some(cancel) = self.cancelations.swap_remove(key) {
|
||||
let _ = cancel.send(());
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_unready(&mut self) {
|
||||
fn promote_pending_to_ready(&mut self) {
|
||||
loop {
|
||||
match self.unready_services.poll() {
|
||||
Ok(Async::NotReady) | Ok(Async::Ready(None)) => return,
|
||||
Ok(Async::Ready(Some((key, svc)))) => {
|
||||
trace!("endpoint ready");
|
||||
let _cancel = self.cancelations.swap_remove(&key);
|
||||
debug_assert!(_cancel.is_some(), "missing cancelation");
|
||||
self.ready_services.insert(key, svc);
|
||||
match self.services.poll_pending() {
|
||||
Ok(Async::Ready(())) => {
|
||||
// There are no remaining pending services.
|
||||
debug_assert_eq!(self.services.pending_len(), 0);
|
||||
break;
|
||||
}
|
||||
Err((key, Error::Canceled)) => debug_assert!(!self.cancelations.contains_key(&key)),
|
||||
Err((key, Error::Inner(e))) => {
|
||||
let error = e.into();
|
||||
debug!({ %error }, "dropping failed endpoint");
|
||||
let _cancel = self.cancelations.swap_remove(&key);
|
||||
debug_assert!(_cancel.is_some());
|
||||
Ok(Async::NotReady) => {
|
||||
// None of the pending services are ready.
|
||||
debug_assert!(self.services.pending_len() > 0);
|
||||
break;
|
||||
}
|
||||
Err(error) => {
|
||||
// An individual service was lost; continue processing
|
||||
// pending services.
|
||||
debug!(%error, "dropping failed endpoint");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the updated index of `orig_idx` after the entry at `rm_idx` was
|
||||
// swap-removed from an IndexMap with `orig_sz` items.
|
||||
//
|
||||
// If `orig_idx` is the same as `rm_idx`, None is returned to indicate that
|
||||
// index cannot be repaired.
|
||||
fn repair_index(orig_idx: usize, rm_idx: usize, new_sz: usize) -> Option<usize> {
|
||||
debug_assert!(orig_idx <= new_sz && rm_idx <= new_sz);
|
||||
let repaired = match orig_idx {
|
||||
i if i == rm_idx => None, // removed
|
||||
i if i == new_sz => Some(rm_idx), // swapped
|
||||
i => Some(i), // uneffected
|
||||
};
|
||||
trace!(
|
||||
{ next.idx = orig_idx, removed.idx = rm_idx, length = new_sz, repaired.idx = ?repaired },
|
||||
"repairing index"
|
||||
ready = %self.services.ready_len(),
|
||||
pending = %self.services.pending_len(),
|
||||
"poll_unready"
|
||||
);
|
||||
repaired
|
||||
}
|
||||
|
||||
/// Performs P2C on inner services to find a suitable endpoint.
|
||||
fn p2c_next_ready_index(&mut self) -> Option<usize> {
|
||||
match self.ready_services.len() {
|
||||
fn p2c_ready_index(&mut self) -> Option<usize> {
|
||||
match self.services.ready_len() {
|
||||
0 => None,
|
||||
1 => Some(0),
|
||||
len => {
|
||||
|
@ -193,48 +139,26 @@ where
|
|||
|
||||
let aload = self.ready_index_load(aidx);
|
||||
let bload = self.ready_index_load(bidx);
|
||||
let ready = if aload <= bload { aidx } else { bidx };
|
||||
let chosen = if aload <= bload { aidx } else { bidx };
|
||||
|
||||
trace!({ a.idx = aidx, a.load = ?aload, b.idx = bidx, b.load = ?bload, ready = ?ready }, "choosing by load");
|
||||
Some(ready)
|
||||
trace!(
|
||||
a.index = aidx,
|
||||
a.load = ?aload,
|
||||
b.index = bidx,
|
||||
b.load = ?bload,
|
||||
chosen = if chosen == aidx { "a" } else { "b" },
|
||||
"p2c",
|
||||
);
|
||||
Some(chosen)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Accesses a ready endpoint by index and returns its current load.
|
||||
fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
|
||||
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
|
||||
let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
|
||||
svc.load()
|
||||
}
|
||||
|
||||
fn poll_ready_index_or_evict(&mut self, index: usize) -> Poll<(), ()> {
|
||||
let (_, svc) = self
|
||||
.ready_services
|
||||
.get_index_mut(index)
|
||||
.expect("invalid index");
|
||||
|
||||
match svc.poll_ready() {
|
||||
Ok(Async::Ready(())) => Ok(Async::Ready(())),
|
||||
Ok(Async::NotReady) => {
|
||||
// became unready; so move it back there.
|
||||
let (key, svc) = self
|
||||
.ready_services
|
||||
.swap_remove_index(index)
|
||||
.expect("invalid ready index");
|
||||
self.push_unready(key, svc);
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
Err(e) => {
|
||||
// failed, so drop it.
|
||||
let error = e.into();
|
||||
debug!({ %error }, "evicting failed endpoint");
|
||||
self.ready_services
|
||||
.swap_remove_index(index)
|
||||
.expect("invalid ready index");
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<D, Req> Service<Req> for Balance<D, Req>
|
||||
|
@ -253,79 +177,54 @@ where
|
|||
fn(<D::Service as Service<Req>>::Error) -> error::Error,
|
||||
>;
|
||||
|
||||
/// Prepares the balancer to process a request.
|
||||
///
|
||||
/// When `Async::Ready` is returned, `ready_index` is set with a valid index
|
||||
/// into `ready` referring to a `Service` that is ready to disptach a request.
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
// First and foremost, process discovery updates. This removes or updates a
|
||||
// previously-selected `ready_index` if appropriate.
|
||||
self.poll_discover()?;
|
||||
|
||||
// Drive new or busy services to readiness.
|
||||
self.poll_unready();
|
||||
trace!({ nready = self.ready_services.len(), nunready = self.unready_services.len() }, "poll_ready");
|
||||
// `ready_index` may have already been set by a prior invocation. These
|
||||
// updates cannot disturb the order of existing ready services.
|
||||
self.update_pending_from_discover()?;
|
||||
self.promote_pending_to_ready();
|
||||
|
||||
loop {
|
||||
// If a node has already been selected, ensure that it is ready.
|
||||
// If a service has already been selected, ensure that it is ready.
|
||||
// This ensures that the underlying service is ready immediately
|
||||
// before a request is dispatched to it. If, e.g., a failure
|
||||
// detector has changed the state of the service, it may be evicted
|
||||
// from the ready set so that P2C can be performed again.
|
||||
if let Some(index) = self.next_ready_index {
|
||||
trace!({ next.idx = index }, "preselected ready_index");
|
||||
debug_assert!(index < self.ready_services.len());
|
||||
|
||||
if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) {
|
||||
return Ok(Async::Ready(()));
|
||||
// before a request is dispatched to it (i.e. in the same task
|
||||
// invocation). If, e.g., a failure detector has changed the state
|
||||
// of the service, it may be evicted from the ready set so that
|
||||
// another service can be selected.
|
||||
if let Some(index) = self.ready_index.take() {
|
||||
match self.services.check_ready_index(index) {
|
||||
Ok(true) => {
|
||||
// The service remains ready.
|
||||
self.ready_index = Some(index);
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
Ok(false) => {
|
||||
// The service is no longer ready. Try to find a new one.
|
||||
trace!("ready service became unavailable");
|
||||
}
|
||||
Err(Failed(_, error)) => {
|
||||
// The ready endpoint failed, so log the error and try
|
||||
// to find a new one.
|
||||
debug!(%error, "endpoint failed");
|
||||
}
|
||||
}
|
||||
|
||||
self.next_ready_index = None;
|
||||
}
|
||||
|
||||
self.next_ready_index = self.p2c_next_ready_index();
|
||||
if self.next_ready_index.is_none() {
|
||||
debug_assert!(self.ready_services.is_empty());
|
||||
// Select a new service by comparing two at random and using the
|
||||
// lesser-loaded service.
|
||||
self.ready_index = self.p2c_ready_index();
|
||||
if self.ready_index.is_none() {
|
||||
debug_assert_eq!(self.services.ready_len(), 0);
|
||||
// We have previously registered interest in updates from
|
||||
// discover and pending services.
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Req) -> Self::Future {
|
||||
let index = self.next_ready_index.take().expect("not ready");
|
||||
let (key, mut svc) = self
|
||||
.ready_services
|
||||
.swap_remove_index(index)
|
||||
.expect("invalid ready index");
|
||||
// no need to repair since the ready_index has been cleared.
|
||||
|
||||
let fut = svc.call(request);
|
||||
self.push_unready(key, svc);
|
||||
|
||||
fut.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
|
||||
type Item = (K, S);
|
||||
type Error = (K, Error<S::Error>);
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Ok(Async::Ready(())) = self.cancel.poll() {
|
||||
let key = self.key.take().expect("polled after ready");
|
||||
return Err((key, Error::Canceled));
|
||||
}
|
||||
|
||||
match self.ready.poll() {
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(svc)) => {
|
||||
let key = self.key.take().expect("polled after ready");
|
||||
Ok((key, svc).into())
|
||||
}
|
||||
Err(e) => {
|
||||
let key = self.key.take().expect("polled after ready");
|
||||
Err((key, Error::Inner(e)))
|
||||
}
|
||||
}
|
||||
let index = self.ready_index.take().expect("called before ready");
|
||||
self.services
|
||||
.call_ready_index(index, request)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
[package]
|
||||
name = "tower-ready-cache"
|
||||
# When releasing to crates.io:
|
||||
# - Remove path dependencies
|
||||
# - Update html_root_url.
|
||||
# - Update doc url
|
||||
# - Cargo.toml
|
||||
# - README.md
|
||||
# - Update CHANGELOG.md.
|
||||
# - Create "v0.1.x" git tag.
|
||||
version = "0.1.0"
|
||||
authors = ["Tower Maintainers <team@tower-rs.com>"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/tower-rs/tower"
|
||||
homepage = "https://github.com/tower-rs/tower"
|
||||
documentation = "https://docs.rs/tower-ready-cache/0.1.0"
|
||||
description = """
|
||||
Caches a set of services
|
||||
"""
|
||||
categories = ["asynchronous", "network-programming"]
|
||||
edition = "2018"
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
futures = "0.1.26"
|
||||
indexmap = "1.0.2"
|
||||
log = "0.4.1"
|
||||
tokio-sync = "0.1.3"
|
||||
tower-service = "0.2.0"
|
||||
tower-util = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-executor = "0.1.7"
|
||||
tower-test = { version = "0.1.0", path = "../tower-test" }
|
|
@ -0,0 +1,374 @@
|
|||
//! A cache of services.
|
||||
|
||||
use crate::error;
|
||||
use futures::{stream, Async, Future, Poll, Stream};
|
||||
pub use indexmap::Equivalent;
|
||||
use indexmap::IndexMap;
|
||||
use log::{debug, trace};
|
||||
use std::hash::Hash;
|
||||
use tokio_sync::oneshot;
|
||||
use tower_service::Service;
|
||||
|
||||
/// Drives readiness over a set of services.
|
||||
///
|
||||
/// The cache maintains two internal data structures:
|
||||
///
|
||||
/// * a set of _pending_ services that have not yet become ready; and
|
||||
/// * a set of _ready_ services that have previously polled ready.
|
||||
///
|
||||
/// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it
|
||||
/// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked,
|
||||
/// pending services are polled and added to the _ready set_.
|
||||
///
|
||||
/// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a
|
||||
/// request to the specified service, but panics if the specified service is not
|
||||
/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
|
||||
/// that a service is ready before dispatching a request.
|
||||
///
|
||||
/// The ready set can hold services for an abitrarily long time. During this
|
||||
/// time, the runtime may process events that invalidate that ready state (for
|
||||
/// instance, if a keepalive detects a lost connection). In such cases, callers
|
||||
/// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`)
|
||||
/// immediately before dispatching a request to ensure that the service has not
|
||||
/// become unavailable.
|
||||
///
|
||||
/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
|
||||
/// the _pending_ set to be driven to readiness again.
|
||||
///
|
||||
/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
|
||||
/// specified service is _not_ ready. If an error is returned, this indicats that
|
||||
/// the server failed nad has been removed from the cache entirely.
|
||||
///
|
||||
/// `ReadyCache::evict` can be used to remove a service from the cache (by key),
|
||||
/// though the service may not be dropped (if it is currently pending) until
|
||||
/// `ReadyCache::poll_pending` is invoked.
|
||||
///
|
||||
/// Note that the by-index accessors are provided to support use cases (like
|
||||
/// power-of-two-choices load balancing) where the caller does not care to keep
|
||||
/// track of each service's key. Instead, it needs only to access _some_ ready
|
||||
/// service. In such a case, it should be noted that calls to
|
||||
/// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of
|
||||
/// the ready set, so any cached indexes should be discarded after such a call.
|
||||
#[derive(Debug)]
|
||||
pub struct ReadyCache<K, S, Req>
|
||||
where
|
||||
K: Eq + Hash,
|
||||
{
|
||||
/// A stream of services that are not yet ready.
|
||||
pending: stream::FuturesUnordered<Pending<K, S, Req>>,
|
||||
/// An index of cancelation handles for pending streams.
|
||||
pending_cancel_txs: IndexMap<K, CancelTx>,
|
||||
|
||||
/// Services that have previously become ready. Readiness can become stale,
|
||||
/// so a given service should be polled immediately before use.
|
||||
///
|
||||
/// The cancelation oneshot is preserved (though unused) while the service is
|
||||
/// ready so that it need not be reallocated each time a request is
|
||||
/// dispatched.
|
||||
ready: IndexMap<K, (S, CancelPair)>,
|
||||
}
|
||||
|
||||
type CancelRx = oneshot::Receiver<()>;
|
||||
type CancelTx = oneshot::Sender<()>;
|
||||
type CancelPair = (CancelTx, CancelRx);
|
||||
|
||||
#[derive(Debug)]
|
||||
enum PendingError<K, E> {
|
||||
Canceled(K),
|
||||
Inner(K, E),
|
||||
}
|
||||
|
||||
/// A Future that becomes satisfied when an `S`-typed service is ready.
|
||||
///
|
||||
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
|
||||
#[derive(Debug)]
|
||||
struct Pending<K, S, Req> {
|
||||
key: Option<K>,
|
||||
cancel: Option<CancelRx>,
|
||||
ready: tower_util::Ready<S, Req>,
|
||||
}
|
||||
|
||||
// === ReadyCache ===
|
||||
|
||||
impl<K, S, Req> Default for ReadyCache<K, S, Req>
|
||||
where
|
||||
K: Eq + Hash,
|
||||
S: Service<Req>,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
ready: IndexMap::default(),
|
||||
pending: stream::FuturesUnordered::new(),
|
||||
pending_cancel_txs: IndexMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, S, Req> ReadyCache<K, S, Req>
|
||||
where
|
||||
K: Eq + Hash,
|
||||
{
|
||||
/// Returns the total number of services in the cache.
|
||||
pub fn len(&self) -> usize {
|
||||
self.ready_len() + self.pending_len()
|
||||
}
|
||||
|
||||
/// Returns the number of services in the ready set.
|
||||
pub fn ready_len(&self) -> usize {
|
||||
self.ready.len()
|
||||
}
|
||||
|
||||
/// Returns the number of services in the unready set.
|
||||
pub fn pending_len(&self) -> usize {
|
||||
self.pending.len()
|
||||
}
|
||||
|
||||
/// Returns true iff the given key is in the unready set.
|
||||
pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
|
||||
self.pending_cancel_txs.contains_key(key)
|
||||
}
|
||||
|
||||
/// Obtains a reference to a service in the ready set by key.
|
||||
pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
|
||||
self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
|
||||
}
|
||||
|
||||
/// Obtains a mutable reference to a service in the ready set by key.
|
||||
pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
|
||||
&mut self,
|
||||
key: &Q,
|
||||
) -> Option<(usize, &K, &mut S)> {
|
||||
self.ready
|
||||
.get_full_mut(key)
|
||||
.map(|(i, k, v)| (i, k, &mut v.0))
|
||||
}
|
||||
|
||||
/// Obtains a reference to a service in the ready set by index.
|
||||
pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
|
||||
self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
|
||||
}
|
||||
|
||||
/// Obtains a mutable reference to a service in the ready set by index.
|
||||
pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
|
||||
self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
|
||||
}
|
||||
|
||||
/// Evicts an item from the cache.
|
||||
///
|
||||
/// Returns true if a service was marked for eviction.
|
||||
///
|
||||
/// Services are dropped from the ready set immediately. Services in the
|
||||
/// pending set are marked for cancellation, but `ReadyCache::poll_pending`
|
||||
/// must be called to cause the service to be dropped.
|
||||
pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
|
||||
let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
|
||||
c.send(()).expect("cancel receiver lost");
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
self.ready
|
||||
.swap_remove_full(key)
|
||||
.map(|_| true)
|
||||
.unwrap_or(canceled)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, S, Req> ReadyCache<K, S, Req>
|
||||
where
|
||||
K: Clone + Eq + Hash,
|
||||
S: Service<Req>,
|
||||
<S as Service<Req>>::Error: Into<error::Error>,
|
||||
S::Error: Into<error::Error>,
|
||||
{
|
||||
/// Pushes a new service onto the pending set.
|
||||
///
|
||||
/// The service will be promoted to the ready set as `poll_pending` is invoked.
|
||||
///
|
||||
/// Note that this does **not** remove services from the ready set. Once the
|
||||
/// old service is used, it will be dropped instead of being added back to
|
||||
/// the pending set; OR, when the new service becomes ready, it will replace
|
||||
/// the prior service in the ready set.
|
||||
pub fn push(&mut self, key: K, svc: S) {
|
||||
let cancel = oneshot::channel();
|
||||
self.push_pending(key, svc, cancel);
|
||||
}
|
||||
|
||||
fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
|
||||
if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
|
||||
// If there is already a service for this key, cancel it.
|
||||
c.send(()).expect("cancel receiver lost");
|
||||
}
|
||||
self.pending.push(Pending {
|
||||
key: Some(key),
|
||||
cancel: Some(cancel_rx),
|
||||
ready: tower_util::Ready::new(svc),
|
||||
});
|
||||
}
|
||||
|
||||
/// Polls services pending readiness, adding ready services to the ready set.
|
||||
///
|
||||
/// Returns `Async::Ready` when there are no remaining unready services.
|
||||
/// `poll_pending` should be called again after `push_service` or
|
||||
/// `call_ready_index` are invoked.
|
||||
///
|
||||
/// Failures indicate that an individual pending service failed to become
|
||||
/// ready (and has been removed from the cache). In such a case,
|
||||
/// `poll_pending` should typically be called again to continue driving
|
||||
/// pending services to readiness.
|
||||
pub fn poll_pending(&mut self) -> Poll<(), error::Failed<K>> {
|
||||
loop {
|
||||
match self.pending.poll() {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some((key, svc, cancel_rx)))) => {
|
||||
trace!("endpoint ready");
|
||||
let cancel_tx = self
|
||||
.pending_cancel_txs
|
||||
.swap_remove(&key)
|
||||
.expect("missing cancelation");
|
||||
// Keep track of the cancelation so that it need not be
|
||||
// recreated after the service is used.
|
||||
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
|
||||
}
|
||||
Err(PendingError::Canceled(_)) => {
|
||||
debug!("endpoint canceled");
|
||||
// The cancellation for this service was removed in order to
|
||||
// cause this cancellation.
|
||||
}
|
||||
Err(PendingError::Inner(key, e)) => {
|
||||
self.pending_cancel_txs
|
||||
.swap_remove(&key)
|
||||
.expect("missing cancelation");
|
||||
return Err(error::Failed(key, e.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the referenced endpoint is ready.
|
||||
///
|
||||
/// Returns true if the endpoint is ready and false if it is not. An error is
|
||||
/// returned if the endpoint fails.
|
||||
pub fn check_ready<Q: Hash + Equivalent<K>>(
|
||||
&mut self,
|
||||
key: &Q,
|
||||
) -> Result<bool, error::Failed<K>> {
|
||||
match self.ready.get_full_mut(key) {
|
||||
Some((index, _, _)) => self.check_ready_index(index),
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the referenced endpoint is ready.
|
||||
///
|
||||
/// If the service is no longer ready, it is moved back into the pending set
|
||||
/// and `false` is returned.
|
||||
///
|
||||
/// If the service errors, it is removed and dropped and the error is returned.
|
||||
pub fn check_ready_index(&mut self, index: usize) -> Result<bool, error::Failed<K>> {
|
||||
let svc = match self.ready.get_index_mut(index) {
|
||||
None => return Ok(false),
|
||||
Some((_, (svc, _))) => svc,
|
||||
};
|
||||
match svc.poll_ready() {
|
||||
Ok(Async::Ready(())) => Ok(true),
|
||||
Ok(Async::NotReady) => {
|
||||
// became unready; so move it back there.
|
||||
let (key, (svc, cancel)) = self
|
||||
.ready
|
||||
.swap_remove_index(index)
|
||||
.expect("invalid ready index");
|
||||
|
||||
// If a new version of this service has been added to the
|
||||
// unready set, don't overwrite it.
|
||||
if !self.pending_contains(&key) {
|
||||
self.push_pending(key, svc, cancel);
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
Err(e) => {
|
||||
// failed, so drop it.
|
||||
let (key, _) = self
|
||||
.ready
|
||||
.swap_remove_index(index)
|
||||
.expect("invalid ready index");
|
||||
Err(error::Failed(key, e.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls a ready service by key.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the specified key does not exist in the ready
|
||||
pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
|
||||
let (index, _, _) = self
|
||||
.ready
|
||||
.get_full_mut(key)
|
||||
.expect("check_ready was not called");
|
||||
self.call_ready_index(index, req)
|
||||
}
|
||||
|
||||
/// Calls a ready service by index.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the specified index is out of range.
|
||||
pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
|
||||
let (key, (mut svc, cancel)) = self
|
||||
.ready
|
||||
.swap_remove_index(index)
|
||||
.expect("check_ready_index was not called");
|
||||
|
||||
let fut = svc.call(req);
|
||||
|
||||
// If a new version of this service has been added to the
|
||||
// unready set, don't overwrite it.
|
||||
if !self.pending_contains(&key) {
|
||||
self.push_pending(key, svc, cancel);
|
||||
}
|
||||
|
||||
fut
|
||||
}
|
||||
}
|
||||
|
||||
// === Pending ===
|
||||
|
||||
impl<K, S, Req> Future for Pending<K, S, Req>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Item = (K, S, CancelRx);
|
||||
type Error = PendingError<K, S::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if self
|
||||
.cancel
|
||||
.as_mut()
|
||||
.expect("polled after complete")
|
||||
.poll()
|
||||
.expect("cancel sender lost")
|
||||
.is_ready()
|
||||
{
|
||||
let key = self.key.take().expect("polled after complete");
|
||||
return Err(PendingError::Canceled(key));
|
||||
}
|
||||
|
||||
match self.ready.poll() {
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(svc)) => {
|
||||
let key = self.key.take().expect("polled after complete");
|
||||
let cancel = self.cancel.take().expect("polled after complete");
|
||||
Ok((key, svc, cancel).into())
|
||||
}
|
||||
Err(e) => {
|
||||
let key = self.key.take().expect("polled after compete");
|
||||
Err(PendingError::Inner(key, e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
//! Errors
|
||||
|
||||
/// A generic error type.
|
||||
pub type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
/// An error indicating that the service with a `K`-typed key failed with an
|
||||
/// error.
|
||||
pub struct Failed<K>(pub K, pub Error);
|
||||
|
||||
// === Failed ===
|
||||
|
||||
impl<K> std::fmt::Debug for Failed<K> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
std::fmt::Debug::fmt(&self.1, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> std::fmt::Display for Failed<K> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
self.1.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> std::error::Error for Failed<K> {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
self.1.source()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
//! A cache of services
|
||||
|
||||
#![doc(html_root_url = "https://docs.rs/tower-ready-cache/0.1.0")]
|
||||
#![deny(missing_docs)]
|
||||
#![deny(rust_2018_idioms)]
|
||||
#![allow(elided_lifetimes_in_paths)]
|
||||
#![deny(warnings)]
|
||||
|
||||
pub mod cache;
|
||||
pub mod error;
|
||||
|
||||
pub use self::cache::ReadyCache;
|
|
@ -0,0 +1,81 @@
|
|||
use futures::prelude::*;
|
||||
use tower_ready_cache::{error, ReadyCache};
|
||||
use tower_test::mock;
|
||||
|
||||
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
|
||||
use futures::future::lazy;
|
||||
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
|
||||
}
|
||||
|
||||
type Req = &'static str;
|
||||
type Mock = mock::Mock<Req, Req>;
|
||||
|
||||
#[test]
|
||||
fn poll_ready_inner_failure() {
|
||||
let mut cache = ReadyCache::<usize, Mock, Req>::default();
|
||||
|
||||
let (service0, mut handle0) = mock::pair::<Req, Req>();
|
||||
handle0.send_error("doom");
|
||||
cache.push(0, service0);
|
||||
|
||||
let (service1, mut handle1) = mock::pair::<Req, Req>();
|
||||
handle1.allow(1);
|
||||
cache.push(1, service1);
|
||||
|
||||
with_task(|| {
|
||||
let error::Failed(key, err) = cache
|
||||
.poll_pending()
|
||||
.err()
|
||||
.expect("poll_ready should fail when exhausted");
|
||||
assert_eq!(key, 0);
|
||||
assert_eq!(format!("{}", err), "doom");
|
||||
});
|
||||
|
||||
assert_eq!(cache.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_ready_not_ready() {
|
||||
let mut cache = ReadyCache::<usize, Mock, Req>::default();
|
||||
|
||||
let (service0, mut handle0) = mock::pair::<Req, Req>();
|
||||
handle0.allow(0);
|
||||
cache.push(0, service0);
|
||||
|
||||
let (service1, mut handle1) = mock::pair::<Req, Req>();
|
||||
handle1.allow(0);
|
||||
cache.push(1, service1);
|
||||
|
||||
with_task(|| {
|
||||
assert!(cache.poll_pending().expect("must succeed").is_not_ready());
|
||||
});
|
||||
|
||||
assert_eq!(cache.ready_len(), 0);
|
||||
assert_eq!(cache.pending_len(), 2);
|
||||
assert_eq!(cache.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_ready_promotes_inner() {
|
||||
let mut cache = ReadyCache::<usize, Mock, Req>::default();
|
||||
|
||||
let (service0, mut handle0) = mock::pair::<Req, Req>();
|
||||
handle0.allow(1);
|
||||
cache.push(0, service0);
|
||||
|
||||
let (service1, mut handle1) = mock::pair::<Req, Req>();
|
||||
handle1.allow(1);
|
||||
cache.push(1, service1);
|
||||
|
||||
assert_eq!(cache.ready_len(), 0);
|
||||
assert_eq!(cache.pending_len(), 2);
|
||||
assert_eq!(cache.len(), 2);
|
||||
|
||||
with_task(|| {
|
||||
assert!(cache.poll_pending().expect("must succeed").is_ready());
|
||||
});
|
||||
|
||||
assert_eq!(cache.ready_len(), 2);
|
||||
assert_eq!(cache.pending_len(), 0);
|
||||
assert_eq!(cache.len(), 2);
|
||||
}
|
Loading…
Reference in New Issue