Tidy up tower::load (#445)

This also renames the `Instrument` trait, and related types, to better
reflect what they do. Specifically, the trait is now called
`TrackCompletion`, and `NoInstrument` is called `CompleteOnResponse`.

Also brings back balance example and makes it compile.
This commit is contained in:
Jon Gjengset 2020-04-20 14:55:40 -04:00 committed by GitHub
parent 05b165056b
commit 39112cb0ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 328 additions and 245 deletions

View File

@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable, 1.39.0]
rust: [stable, 1.40.0]
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
@ -77,7 +77,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable, beta, nightly, 1.39.0]
rust: [stable, beta, nightly, 1.40.0]
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1

View File

@ -32,7 +32,7 @@ discover = []
filter = []
hedge = ["filter", "futures-util", "hdrhistogram", "tokio/time"]
limit = ["tokio/time"]
load = ["discover", "tokio/time"]
load = ["tokio/time"]
load-shed = []
make = ["tokio/io-std"]
ready-cache = ["futures-util", "indexmap", "tokio/sync"]
@ -73,4 +73,4 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["full"]
features = ["full"]

View File

@ -1,21 +1,22 @@
//! Exercises load balancers with mocked services.
use futures_core::TryStream;
use futures_core::{Stream, TryStream};
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project::pin_project;
use rand::{self, Rng};
use std::hash::Hash;
use std::time::Duration;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::time::{self, Instant};
use tower::balance as lb;
use tower::discover::{Change, Discover};
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load;
use tower::util::ServiceExt;
use tower_balance as lb;
use tower_discover::{Change, Discover};
use tower_limit::concurrency::ConcurrencyLimit;
use tower_load as load;
use tower_service::Service;
const REQUESTS: usize = 100_000;
@ -61,13 +62,15 @@ async fn main() {
d,
DEFAULT_RTT,
decay,
load::NoInstrument,
load::CompleteOnResponse::default(),
));
run("P2C+PeakEWMA...", pe).await;
let d = gen_disco();
let ll =
lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument));
let ll = lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));
run("P2C+LeastLoaded...", ll).await;
}
@ -78,20 +81,19 @@ type Key = usize;
#[pin_project]
struct Disco<S>(Vec<(Key, S)>);
impl<S> Discover for Disco<S>
impl<S> Stream for Disco<S>
where
S: Service<Req, Response = Rsp, Error = Error>,
{
type Key = Key;
type Service = S;
type Error = Error;
fn poll_discover(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
type Item = Result<Change<Key, S>, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.pop() {
Some((k, service)) => Poll::Ready(Ok(Change::Insert(k, service))),
None => Poll::Pending,
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
None => {
// there may be more later
Poll::Pending
}
}
}
}
@ -132,7 +134,7 @@ async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
where
D: Discover + Unpin + Send + 'static,
D::Error: Into<Error>,
D::Key: Clone + Send,
D::Key: Clone + Send + Hash,
D::Service: Service<Req, Response = Rsp> + load::Load + Send,
<D::Service as Service<Req>>::Error: Into<Error>,
<D::Service as Service<Req>>::Future: Send,

View File

@ -0,0 +1,92 @@
//! Application-specific request completion semantics.
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Attaches `H`-typed completion tracker to `V` typed values.
///
/// Handles (of type `H`) are intended to be RAII guards that primarily implement `Drop` and update
/// load metric state as they are dropped. This trait allows implementors to "forward" the handle
/// to later parts of the request-handling pipeline, so that the handle is only dropped when the
/// request has truly completed.
///
/// This utility allows load metrics to have a protocol-agnostic means to track streams past their
/// initial response future. For example, if `V` represents an HTTP response type, an
/// implementation could add `H`-typed handles to each response's extensions to detect when all the
/// response's extensions have been dropped.
///
/// A base `impl<H, V> TrackCompletion<H, V> for CompleteOnResponse` is provided to drop the handle
/// once the response future is resolved. This is appropriate when a response is discrete and
/// cannot comprise multiple messages.
///
/// In many cases, the `Output` type is simply `V`. However, `TrackCompletion` may alter the type
/// in order to instrument it appropriately. For example, an HTTP `TrackCompletion` may modify the
/// body type: so a `TrackCompletion` that takes values of type `http::Response<A>` may output
/// values of type `http::Response<B>`.
pub trait TrackCompletion<H, V>: Clone {
/// The instrumented value type.
type Output;
/// Attaches a `H`-typed handle to a `V`-typed value.
fn track_completion(&self, handle: H, value: V) -> Self::Output;
}
/// A `TrackCompletion` implementation that considers the request completed when the response
/// future is resolved.
#[derive(Clone, Copy, Debug, Default)]
#[non_exhaustive]
pub struct CompleteOnResponse;
/// Attaches a `C`-typed completion tracker to the result of an `F`-typed `Future`.
#[pin_project]
#[derive(Debug)]
pub struct TrackCompletionFuture<F, C, H> {
#[pin]
future: F,
handle: Option<H>,
completion: C,
}
// ===== impl InstrumentFuture =====
impl<F, C, H> TrackCompletionFuture<F, C, H> {
/// Wraps a future, propagating the tracker into its value if successful.
pub fn new(completion: C, handle: H, future: F) -> Self {
TrackCompletionFuture {
future,
completion,
handle: Some(handle),
}
}
}
impl<F, C, H, T, E> Future for TrackCompletionFuture<F, C, H>
where
F: Future<Output = Result<T, E>>,
C: TrackCompletion<H, T>,
{
type Output = Result<C::Output, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let rsp = ready!(this.future.poll(cx))?;
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.completion.track_completion(h, rsp)))
}
}
// ===== CompleteOnResponse =====
impl<H, V> TrackCompletion<H, V> for CompleteOnResponse {
type Output = V;
fn track_completion(&self, handle: H, value: V) -> V {
drop(handle);
value
}
}

View File

@ -1,17 +1,20 @@
//! A constant `Load` implementation. Primarily useful for testing.
//! A constant `Load` implementation.
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::Load;
use pin_project::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;
/// Wraps a type so that `Load::load` returns a constant value.
/// Wraps a type so that it implements `Load` and returns a constant load metric.
///
/// This load estimator is primarily useful for testing.
#[pin_project]
#[derive(Debug)]
pub struct Constant<T, M> {
@ -55,6 +58,7 @@ where
}
/// Proxies `Discover` such that all changes are wrapped with a constant load.
#[cfg(feature = "discover")]
impl<D: Discover + Unpin, M: Copy> Stream for Constant<D, M> {
type Item = Result<Change<D::Key, Constant<D::Service, M>>, D::Error>;

View File

@ -1,86 +0,0 @@
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Attaches `I`-typed instruments to `V` typed values.
///
/// This utility allows load metrics to have a protocol-agnostic means to track streams
/// past their initial response future. For example, if `V` represents an HTTP response
/// type, an implementation could add `H`-typed handles to each response's extensions to
/// detect when the response is dropped.
///
/// Handles are intended to be RAII guards that primarily implement `Drop` and update load
/// metric state as they are dropped.
///
/// A base `impl<H, V> Instrument<H, V> for NoInstrument` is provided to drop the handle
/// immediately. This is appropriate when a response is discrete and cannot comprise
/// multiple messages.
///
/// In many cases, the `Output` type is simply `V`. However, `Instrument` may alter the
/// type in order to instrument it appropriately. For example, an HTTP Instrument may
/// modify the body type: so an `Instrument` that takes values of type `http::Response<A>`
/// may output values of type `http::Response<B>`.
pub trait Instrument<H, V>: Clone {
/// The instrumented value type.
type Output;
/// Attaches an `H`-typed handle to a `V`-typed value.
fn instrument(&self, handle: H, value: V) -> Self::Output;
}
/// A `Instrument` implementation that drops each instrument immediately.
#[derive(Clone, Copy, Debug)]
pub struct NoInstrument;
/// Attaches a `I`-typed instruments to the result of an `F`-typed `Future`.
#[pin_project]
#[derive(Debug)]
pub struct InstrumentFuture<F, I, H> {
#[pin]
future: F,
handle: Option<H>,
instrument: I,
}
// ===== impl InstrumentFuture =====
impl<F, I, H> InstrumentFuture<F, I, H> {
/// Wraps a future, instrumenting its value if successful.
pub fn new(instrument: I, handle: H, future: F) -> Self {
InstrumentFuture {
future,
instrument,
handle: Some(handle),
}
}
}
impl<F, I, H, T, E> Future for InstrumentFuture<F, I, H>
where
F: Future<Output = Result<T, E>>,
I: Instrument<H, T>,
{
type Output = Result<I::Output, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let rsp = ready!(this.future.poll(cx))?;
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.instrument.instrument(h, rsp)))
}
}
// ===== NoInstrument =====
impl<H, V> Instrument<H, V> for NoInstrument {
type Output = V;
fn instrument(&self, handle: H, value: V) -> V {
drop(handle);
value
}
}

View File

@ -1,22 +1,85 @@
//! Abstractions and utilties for measuring a service's load.
//! Service load measurement
//!
//! This module provides the [`Load`] trait, which allows measuring how loaded a service is.
//! It also provides several wrapper types that measure load in different ways:
//!
//! - [`Constant`] — Always returns the same constant load value for a service.
//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests.
//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service.
//!
//! In general, you will want to use one of these when using the types in [`tower::balance`] which
//! balance services depending on their load. Which load metric to use depends on your exact
//! use-case, but the ones above should get you quite far!
//!
//! When the `discover` feature is enabled, wrapper types for [`tower::discover::Discover`] that
//! wrap the discovered services with the given load estimator are also provided.
//!
//! # When does a request complete?
//!
//! For many applications, the request life-cycle is relatively simple: when a service responds to
//! a request, that request is done, and the system can forget about it. However, for some
//! applications, the service may respond to the initial request while other parts of the system
//! are still acting on that request. In such an application, the system load must take these
//! requests into account as well, or risk the system underestimating its own load.
//!
//! To support these use-cases, the load estimators in this module are parameterized by the
//! [`TrackCompletion`] trait, with [`CompleteOnResponse`] as the default type. The behavior of
//! `CompleteOnOnResponse` is what you would normally expect for a request-response cycle: when the
//! response is produced, the request is considered "finished", and load goes down. This can be
//! overriden by your own user-defined type to track more complex request completion semantics. See
//! the documentation for [`tower::load::completion`] for more details.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! use tower::util::ServiceExt;
//! # #[cfg(feature = "util")]
//! use tower::{load::Load, Service};
//! # #[cfg(feature = "util")]
//! async fn simple_balance<S1, S2, R>(
//! svc1: &mut S1,
//! svc2: &mut S2,
//! request: R
//! ) -> Result<S1::Response, S1::Error>
//! where
//! S1: Load + Service<R>,
//! S2: Load<Metric = S1::Metric> + Service<R, Response = S1::Response, Error = S1::Error>
//! {
//! if svc1.load() < svc2.load() {
//! svc1.ready_and().await?.call(request).await
//! } else {
//! svc2.ready_and().await?.call(request).await
//! }
//! }
//! ```
// TODO: a custom completion example would be good here
pub mod completion;
mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;
pub use self::{
completion::{CompleteOnResponse, TrackCompletion},
constant::Constant,
instrument::{Instrument, InstrumentFuture, NoInstrument},
peak_ewma::{PeakEwma, PeakEwmaDiscover},
pending_requests::{PendingRequests, PendingRequestsDiscover},
peak_ewma::PeakEwma,
pending_requests::PendingRequests,
};
/// Exposes a load metric.
#[cfg(feature = "discover")]
pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover};
/// Types that implement this trait can give an estimate of how loaded they are.
///
/// See the module documentation for more details.
pub trait Load {
/// A comparable load metric. Lesser values are "preferable" to greater values.
/// A comparable load metric.
///
/// Lesser values indicate that the service is less loaded, and should be preferred for new
/// requests over another service with a higher value.
type Metric: PartialOrd;
/// Obtains a service's load.
/// Estimate the service's current load.
fn load(&self) -> Self::Metric;
}

View File

@ -1,14 +1,17 @@
//! A `Load` implementation that PeakEWMA on response latency.
//! A `Load` implementation that measures load using the PeakEWMA response latency.
use super::Load;
use super::{Instrument, InstrumentFuture, NoInstrument};
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::Load;
use std::task::{Context, Poll};
use std::{
sync::{Arc, Mutex},
time::Duration,
@ -17,7 +20,7 @@ use tokio::time::Instant;
use tower_service::Service;
use tracing::trace;
/// Wraps an `S`-typed Service with Peak-EWMA load measurement.
/// Measures the load of the underlying service using Peak-EWMA load measurement.
///
/// `PeakEwma` implements `Load` with the `Cost` metric that estimates the amount of
/// pending work to an endpoint. Work is calculated by multiplying the
@ -26,11 +29,6 @@ use tracing::trace;
/// worst-case latencies. Over time, the peak latency value decays towards the moving
/// average of latencies to the endpoint.
///
/// As requests are sent to the underlying service, an `I`-typed instrumentation strategy
/// is used to track responses to measure latency in an application-specific way. The
/// default strategy measures latency as the elapsed time from the request being issued to
/// the underlying service to the response future being satisfied (or dropped).
///
/// When no latency information has been measured for an endpoint, an arbitrary default
/// RTT of 1 second is used to prevent the endpoint from being overloaded before a
/// meaningful baseline can be established..
@ -43,22 +41,23 @@ use tracing::trace;
/// [finagle]:
/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
#[derive(Debug)]
pub struct PeakEwma<S, I = NoInstrument> {
pub struct PeakEwma<S, C = CompleteOnResponse> {
service: S,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
instrument: I,
completion: C,
}
/// Wraps a `D`-typed stream of discovery updates with `PeakEwma`.
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
#[pin_project]
#[derive(Debug)]
pub struct PeakEwmaDiscover<D, I = NoInstrument> {
#[cfg(feature = "discover")]
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
decay_ns: f64,
default_rtt: Duration,
instrument: I,
completion: C,
}
/// Represents the relative cost of communicating with a service.
@ -87,65 +86,14 @@ const NANOS_PER_MILLI: f64 = 1_000_000.0;
// ===== impl PeakEwma =====
impl<D, I> PeakEwmaDiscover<D, I> {
/// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
/// added services.
///
/// They `decay` value determines over what time period a RTT estimate should
/// decay.
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, instrument: I) -> Self
where
D: Discover,
D::Service: Service<Request>,
I: Instrument<Handle, <D::Service as Service<Request>>::Response>,
{
PeakEwmaDiscover {
discover,
decay_ns: nanos(decay),
default_rtt,
instrument,
}
}
}
impl<D, I> Stream for PeakEwmaDiscover<D, I>
where
D: Discover,
I: Clone,
{
type Item = Result<Change<D::Key, PeakEwma<D::Service, I>>, D::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Change::Remove(k)) => Change::Remove(k),
Some(Change::Insert(k, svc)) => {
let peak_ewma = PeakEwma::new(
svc,
*this.default_rtt,
*this.decay_ns,
this.instrument.clone(),
);
Change::Insert(k, peak_ewma)
}
};
Poll::Ready(Some(Ok(change)))
}
}
// ===== impl PeakEwma =====
impl<S, I> PeakEwma<S, I> {
fn new(service: S, default_rtt: Duration, decay_ns: f64, instrument: I) -> Self {
impl<S, C> PeakEwma<S, C> {
/// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency.
pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self {
Self {
service,
decay_ns,
rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))),
instrument,
completion,
}
}
@ -158,29 +106,29 @@ impl<S, I> PeakEwma<S, I> {
}
}
impl<S, I, Request> Service<Request> for PeakEwma<S, I>
impl<S, C, Request> Service<Request> for PeakEwma<S, C>
where
S: Service<Request>,
I: Instrument<Handle, S::Response>,
C: TrackCompletion<Handle, S::Response>,
{
type Response = I::Output;
type Response = C::Output;
type Error = S::Error;
type Future = InstrumentFuture<S::Future, I, Handle>;
type Future = TrackCompletionFuture<S::Future, C, Handle>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
InstrumentFuture::new(
self.instrument.clone(),
TrackCompletionFuture::new(
self.completion.clone(),
self.handle(),
self.service.call(req),
)
}
}
impl<S, I> Load for PeakEwma<S, I> {
impl<S, C> Load for PeakEwma<S, C> {
type Metric = Cost;
fn load(&self) -> Self::Metric {
@ -201,13 +149,67 @@ impl<S, I> Load for PeakEwma<S, I> {
}
}
impl<S, I> PeakEwma<S, I> {
impl<S, C> PeakEwma<S, C> {
fn update_estimate(&self) -> f64 {
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
rtt.decay(self.decay_ns)
}
}
// ===== impl PeakEwmaDiscover =====
#[cfg(feature = "discover")]
impl<D, C> PeakEwmaDiscover<D, C> {
/// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
/// added services.
///
/// They `decay` value determines over what time period a RTT estimate should
/// decay.
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self
where
D: Discover,
D::Service: Service<Request>,
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
{
PeakEwmaDiscover {
discover,
decay_ns: nanos(decay),
default_rtt,
completion,
}
}
}
#[cfg(feature = "discover")]
impl<D, C> Stream for PeakEwmaDiscover<D, C>
where
D: Discover,
C: Clone,
{
type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Change::Remove(k)) => Change::Remove(k),
Some(Change::Insert(k, svc)) => {
let peak_ewma = PeakEwma::new(
svc,
*this.default_rtt,
*this.decay_ns,
this.completion.clone(),
);
Change::Insert(k, peak_ewma)
}
};
Poll::Ready(Some(Ok(change)))
}
}
// ===== impl RttEstimate =====
impl RttEstimate {
@ -336,7 +338,7 @@ mod tests {
Svc,
Duration::from_millis(10),
NANOS_PER_MILLI * 1_000.0,
NoInstrument,
CompleteOnResponse,
);
let Cost(load) = svc.load();
assert_eq!(load, 10.0 * NANOS_PER_MILLI);
@ -360,7 +362,7 @@ mod tests {
Svc,
Duration::from_millis(20),
NANOS_PER_MILLI * 1_000.0,
NoInstrument,
CompleteOnResponse,
);
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));

View File

@ -1,37 +1,40 @@
//! A `Load` implementation that uses the count of in-flight requests.
//! A `Load` implementation that measures load using the number of in-flight requests.
use super::Load;
use super::{Instrument, InstrumentFuture, NoInstrument};
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::Load;
use std::sync::Arc;
use std::{
pin::Pin,
task::{Context, Poll},
};
use std::task::{Context, Poll};
use tower_service::Service;
/// Expresses load based on the number of currently-pending requests.
/// Measures the load of the underlying service using the number of currently-pending requests.
#[derive(Debug)]
pub struct PendingRequests<S, I = NoInstrument> {
pub struct PendingRequests<S, C = CompleteOnResponse> {
service: S,
ref_count: RefCount,
instrument: I,
completion: C,
}
/// Shared between instances of `PendingRequests` and `Handle` to track active
/// references.
/// Shared between instances of `PendingRequests` and `Handle` to track active references.
#[derive(Clone, Debug, Default)]
struct RefCount(Arc<()>);
/// Wraps `inner`'s services with `PendingRequests`.
/// Wraps a `D`-typed stream of discovered services with `PendingRequests`.
#[pin_project]
#[derive(Debug)]
pub struct PendingRequestsDiscover<D, I = NoInstrument> {
#[cfg(feature = "discover")]
pub struct PendingRequestsDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
instrument: I,
completion: C,
}
/// Represents the number of currently-pending requests to a given service.
@ -44,11 +47,12 @@ pub struct Handle(RefCount);
// ===== impl PendingRequests =====
impl<S, I> PendingRequests<S, I> {
fn new(service: S, instrument: I) -> Self {
impl<S, C> PendingRequests<S, C> {
/// Wraps an `S`-typed service so that its load is tracked by the number of pending requests.
pub fn new(service: S, completion: C) -> Self {
Self {
service,
instrument,
completion,
ref_count: RefCount::default(),
}
}
@ -58,7 +62,7 @@ impl<S, I> PendingRequests<S, I> {
}
}
impl<S, I> Load for PendingRequests<S, I> {
impl<S, C> Load for PendingRequests<S, C> {
type Metric = Count;
fn load(&self) -> Count {
@ -67,22 +71,22 @@ impl<S, I> Load for PendingRequests<S, I> {
}
}
impl<S, I, Request> Service<Request> for PendingRequests<S, I>
impl<S, C, Request> Service<Request> for PendingRequests<S, C>
where
S: Service<Request>,
I: Instrument<Handle, S::Response>,
C: TrackCompletion<Handle, S::Response>,
{
type Response = I::Output;
type Response = C::Output;
type Error = S::Error;
type Future = InstrumentFuture<S::Future, I, Handle>;
type Future = TrackCompletionFuture<S::Future, C, Handle>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
InstrumentFuture::new(
self.instrument.clone(),
TrackCompletionFuture::new(
self.completion.clone(),
self.handle(),
self.service.call(req),
)
@ -91,27 +95,29 @@ where
// ===== impl PendingRequestsDiscover =====
impl<D, I> PendingRequestsDiscover<D, I> {
#[cfg(feature = "discover")]
impl<D, C> PendingRequestsDiscover<D, C> {
/// Wraps a `Discover``, wrapping all of its services with `PendingRequests`.
pub fn new<Request>(discover: D, instrument: I) -> Self
pub fn new<Request>(discover: D, completion: C) -> Self
where
D: Discover,
D::Service: Service<Request>,
I: Instrument<Handle, <D::Service as Service<Request>>::Response>,
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
{
Self {
discover,
instrument,
completion,
}
}
}
impl<D, I> Stream for PendingRequestsDiscover<D, I>
#[cfg(feature = "discover")]
impl<D, C> Stream for PendingRequestsDiscover<D, C>
where
D: Discover,
I: Clone,
C: Clone,
{
type Item = Result<Change<D::Key, PendingRequests<D::Service, I>>, D::Error>;
type Item = Result<Change<D::Key, PendingRequests<D::Service, C>>, D::Error>;
/// Yields the next discovery change set.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -120,7 +126,7 @@ where
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.instrument.clone())),
Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.completion.clone())),
Some(Remove(k)) => Remove(k),
};
@ -159,7 +165,7 @@ mod tests {
#[test]
fn default() {
let mut svc = PendingRequests::new(Svc, NoInstrument);
let mut svc = PendingRequests::new(Svc, CompleteOnResponse);
assert_eq!(svc.load(), Count(0));
let rsp0 = svc.call(());
@ -176,12 +182,12 @@ mod tests {
}
#[test]
fn instrumented() {
fn with_completion() {
#[derive(Clone)]
struct IntoHandle;
impl Instrument<Handle, ()> for IntoHandle {
impl TrackCompletion<Handle, ()> for IntoHandle {
type Output = Handle;
fn instrument(&self, i: Handle, (): ()) -> Handle {
fn track_completion(&self, i: Handle, (): ()) -> Handle {
i
}
}