port tower-buffer over to generic error approach
This commit is contained in:
parent
b12a3e3ae9
commit
2c3a500c10
|
@ -2,7 +2,38 @@
|
||||||
|
|
||||||
use std::{error, fmt};
|
use std::{error, fmt};
|
||||||
|
|
||||||
pub(crate) type Error = Box<dyn error::Error + Send + Sync>;
|
// pub(crate) type Error = Box<dyn error::Error + Send + Sync>;
|
||||||
|
|
||||||
|
/// Cloneable dyn Error
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Error {
|
||||||
|
inner: std::sync::Arc<dyn std::error::Error + Send + Sync + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Error {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
self.inner.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E> From<E> for Error
|
||||||
|
where
|
||||||
|
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
|
||||||
|
{
|
||||||
|
fn from(error: E) -> Self {
|
||||||
|
let boxed = error.into();
|
||||||
|
let inner = boxed.into();
|
||||||
|
Self { inner }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Deref for Error {
|
||||||
|
type Target = dyn std::error::Error + Send + Sync + 'static;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.inner.deref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Error yielded when a mocked service does not yet accept requests.
|
/// Error yielded when a mocked service does not yet accept requests.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
|
@ -24,7 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures", "service"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["log"]
|
default = ["log", "buffer"]
|
||||||
log = ["tracing/log"]
|
log = ["tracing/log"]
|
||||||
balance = ["discover", "load", "ready-cache", "make", "rand", "slab"]
|
balance = ["discover", "load", "ready-cache", "make", "rand", "slab"]
|
||||||
buffer = ["tokio/sync", "tokio/rt-core"]
|
buffer = ["tokio/sync", "tokio/rt-core"]
|
||||||
|
|
|
@ -1,226 +0,0 @@
|
||||||
//! Exercises load balancers with mocked services.
|
|
||||||
|
|
||||||
use futures_core::{Stream, TryStream};
|
|
||||||
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
|
|
||||||
use hdrhistogram::Histogram;
|
|
||||||
use pin_project::pin_project;
|
|
||||||
use rand::{self, Rng};
|
|
||||||
use std::hash::Hash;
|
|
||||||
use std::time::Duration;
|
|
||||||
use std::{
|
|
||||||
pin::Pin,
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
use tokio::time::{self, Instant};
|
|
||||||
use tower::balance as lb;
|
|
||||||
use tower::discover::{Change, Discover};
|
|
||||||
use tower::limit::concurrency::ConcurrencyLimit;
|
|
||||||
use tower::load;
|
|
||||||
use tower::util::ServiceExt;
|
|
||||||
use tower_service::Service;
|
|
||||||
|
|
||||||
const REQUESTS: usize = 100_000;
|
|
||||||
const CONCURRENCY: usize = 500;
|
|
||||||
const DEFAULT_RTT: Duration = Duration::from_millis(30);
|
|
||||||
static ENDPOINT_CAPACITY: usize = CONCURRENCY;
|
|
||||||
static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
|
|
||||||
Duration::from_millis(1),
|
|
||||||
Duration::from_millis(5),
|
|
||||||
Duration::from_millis(10),
|
|
||||||
Duration::from_millis(10),
|
|
||||||
Duration::from_millis(10),
|
|
||||||
Duration::from_millis(100),
|
|
||||||
Duration::from_millis(100),
|
|
||||||
Duration::from_millis(100),
|
|
||||||
Duration::from_millis(500),
|
|
||||||
Duration::from_millis(1000),
|
|
||||||
];
|
|
||||||
|
|
||||||
struct Summary {
|
|
||||||
latencies: Histogram<u64>,
|
|
||||||
start: Instant,
|
|
||||||
count_by_instance: [usize; 10],
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap();
|
|
||||||
|
|
||||||
println!("REQUESTS={}", REQUESTS);
|
|
||||||
println!("CONCURRENCY={}", CONCURRENCY);
|
|
||||||
println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY);
|
|
||||||
print!("MAX_ENDPOINT_LATENCIES=[");
|
|
||||||
for max in &MAX_ENDPOINT_LATENCIES {
|
|
||||||
let l = max.as_secs() * 1_000 + u64::from(max.subsec_nanos() / 1_000 / 1_000);
|
|
||||||
print!("{}ms, ", l);
|
|
||||||
}
|
|
||||||
println!("]");
|
|
||||||
|
|
||||||
let decay = Duration::from_secs(10);
|
|
||||||
let d = gen_disco();
|
|
||||||
let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
|
|
||||||
d,
|
|
||||||
DEFAULT_RTT,
|
|
||||||
decay,
|
|
||||||
load::CompleteOnResponse::default(),
|
|
||||||
));
|
|
||||||
run("P2C+PeakEWMA...", pe).await;
|
|
||||||
|
|
||||||
let d = gen_disco();
|
|
||||||
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
|
|
||||||
d,
|
|
||||||
load::CompleteOnResponse::default(),
|
|
||||||
));
|
|
||||||
run("P2C+LeastLoaded...", ll).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
|
||||||
|
|
||||||
type Key = usize;
|
|
||||||
|
|
||||||
#[pin_project]
|
|
||||||
struct Disco<S>(Vec<(Key, S)>);
|
|
||||||
|
|
||||||
impl<S> Stream for Disco<S>
|
|
||||||
where
|
|
||||||
S: Service<Req, Response = Rsp, Error = Error>,
|
|
||||||
{
|
|
||||||
type Item = Result<Change<Key, S>, Error>;
|
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
match self.project().0.pop() {
|
|
||||||
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
|
|
||||||
None => {
|
|
||||||
// there may be more later
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gen_disco() -> impl Discover<
|
|
||||||
Key = Key,
|
|
||||||
Error = Error,
|
|
||||||
Service = ConcurrencyLimit<
|
|
||||||
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
|
|
||||||
>,
|
|
||||||
> + Send {
|
|
||||||
Disco(
|
|
||||||
MAX_ENDPOINT_LATENCIES
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(instance, latency)| {
|
|
||||||
let svc = tower::service_fn(move |_| {
|
|
||||||
let start = Instant::now();
|
|
||||||
|
|
||||||
let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000)
|
|
||||||
.saturating_add(latency.as_secs().saturating_mul(1_000));
|
|
||||||
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
|
|
||||||
|
|
||||||
async move {
|
|
||||||
time::delay_until(start + latency).await;
|
|
||||||
let latency = start.elapsed();
|
|
||||||
Ok(Rsp { latency, instance })
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
|
|
||||||
where
|
|
||||||
D: Discover + Unpin + Send + 'static,
|
|
||||||
D::Error: Into<Error>,
|
|
||||||
D::Key: Clone + Send + Hash,
|
|
||||||
D::Service: Service<Req, Response = Rsp> + load::Load + Send,
|
|
||||||
<D::Service as Service<Req>>::Error: Into<Error>,
|
|
||||||
<D::Service as Service<Req>>::Future: Send,
|
|
||||||
<D::Service as load::Load>::Metric: std::fmt::Debug,
|
|
||||||
{
|
|
||||||
println!("{}", name);
|
|
||||||
|
|
||||||
let requests = stream::repeat(Req).take(REQUESTS);
|
|
||||||
let service = ConcurrencyLimit::new(lb, CONCURRENCY);
|
|
||||||
let responses = service.call_all(requests).unordered();
|
|
||||||
|
|
||||||
compute_histo(responses).await.unwrap().report();
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn compute_histo<S>(mut times: S) -> Result<Summary, Error>
|
|
||||||
where
|
|
||||||
S: TryStream<Ok = Rsp, Error = Error> + 'static + Unpin,
|
|
||||||
{
|
|
||||||
let mut summary = Summary::new();
|
|
||||||
while let Some(rsp) = times.try_next().await? {
|
|
||||||
summary.count(rsp);
|
|
||||||
}
|
|
||||||
Ok(summary)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Summary {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
// The max delay is 2000ms. At 3 significant figures.
|
|
||||||
latencies: Histogram::<u64>::new_with_max(3_000, 3).unwrap(),
|
|
||||||
start: Instant::now(),
|
|
||||||
count_by_instance: [0; 10],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn count(&mut self, rsp: Rsp) {
|
|
||||||
let ms = rsp.latency.as_secs() * 1_000;
|
|
||||||
let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000;
|
|
||||||
self.latencies += ms;
|
|
||||||
self.count_by_instance[rsp.instance] += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn report(&self) {
|
|
||||||
let mut total = 0;
|
|
||||||
for c in &self.count_by_instance {
|
|
||||||
total += c;
|
|
||||||
}
|
|
||||||
for (i, c) in self.count_by_instance.iter().enumerate() {
|
|
||||||
let p = *c as f64 / total as f64 * 100.0;
|
|
||||||
println!(" [{:02}] {:>5.01}%", i, p);
|
|
||||||
}
|
|
||||||
|
|
||||||
println!(" wall {:4}s", self.start.elapsed().as_secs());
|
|
||||||
|
|
||||||
if self.latencies.len() < 2 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5));
|
|
||||||
|
|
||||||
if self.latencies.len() < 10 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9));
|
|
||||||
|
|
||||||
if self.latencies.len() < 50 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95));
|
|
||||||
|
|
||||||
if self.latencies.len() < 100 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99));
|
|
||||||
|
|
||||||
if self.latencies.len() < 1000 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct Req;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Rsp {
|
|
||||||
latency: Duration,
|
|
||||||
instance: usize,
|
|
||||||
}
|
|
|
@ -1,47 +1,12 @@
|
||||||
//! Error types for the `Buffer` middleware.
|
//! Error types for the `Buffer` middleware.
|
||||||
|
|
||||||
use crate::BoxError;
|
use std::fmt;
|
||||||
use std::{fmt, sync::Arc};
|
|
||||||
|
|
||||||
/// An error produced by a `Service` wrapped by a `Buffer`
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ServiceError {
|
|
||||||
inner: Arc<BoxError>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An error produced when the a buffer's worker closes unexpectedly.
|
/// An error produced when the a buffer's worker closes unexpectedly.
|
||||||
pub struct Closed {
|
pub struct Closed {
|
||||||
_p: (),
|
_p: (),
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl ServiceError =====
|
|
||||||
|
|
||||||
impl ServiceError {
|
|
||||||
pub(crate) fn new(inner: BoxError) -> ServiceError {
|
|
||||||
let inner = Arc::new(inner);
|
|
||||||
ServiceError { inner }
|
|
||||||
}
|
|
||||||
|
|
||||||
// Private to avoid exposing `Clone` trait as part of the public API
|
|
||||||
pub(crate) fn clone(&self) -> ServiceError {
|
|
||||||
ServiceError {
|
|
||||||
inner: self.inner.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for ServiceError {
|
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(fmt, "buffered service failed: {}", self.inner)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for ServiceError {
|
|
||||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
||||||
Some(&**self.inner)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Closed =====
|
// ===== impl Closed =====
|
||||||
|
|
||||||
impl Closed {
|
impl Closed {
|
||||||
|
|
|
@ -4,6 +4,7 @@ use super::{error::Closed, message};
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
use std::{
|
use std::{
|
||||||
|
fmt::Debug,
|
||||||
future::Future,
|
future::Future,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
@ -11,40 +12,79 @@ use std::{
|
||||||
|
|
||||||
/// Future that completes when the buffered service eventually services the submitted request.
|
/// Future that completes when the buffered service eventually services the submitted request.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
pub struct ResponseFuture<S, E2, Response>
|
||||||
pub struct ResponseFuture<T> {
|
where
|
||||||
|
S: crate::Service<Response>,
|
||||||
|
{
|
||||||
#[pin]
|
#[pin]
|
||||||
state: ResponseState<T>,
|
state: ResponseState<S, E2, Response>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, E2, Response> Debug for ResponseFuture<S, E2, Response>
|
||||||
|
where
|
||||||
|
S: crate::Service<Response>,
|
||||||
|
S::Future: Debug,
|
||||||
|
S::Error: Debug,
|
||||||
|
E2: Debug,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("ResponseFuture")
|
||||||
|
.field("state", &self.state)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = ResponseStateProj)]
|
#[pin_project(project = ResponseStateProj)]
|
||||||
#[derive(Debug)]
|
enum ResponseState<S, E2, Response>
|
||||||
enum ResponseState<T> {
|
where
|
||||||
Failed(Option<crate::BoxError>),
|
S: crate::Service<Response>,
|
||||||
Rx(#[pin] message::Rx<T>),
|
{
|
||||||
Poll(#[pin] T),
|
Failed(Option<E2>),
|
||||||
|
Rx(#[pin] message::Rx<S::Future, S::Error>),
|
||||||
|
Poll(#[pin] S::Future),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ResponseFuture<T> {
|
impl<S, E2, Response> Debug for ResponseState<S, E2, Response>
|
||||||
pub(crate) fn new(rx: message::Rx<T>) -> Self {
|
where
|
||||||
|
S: crate::Service<Response>,
|
||||||
|
S::Future: Debug,
|
||||||
|
S::Error: Debug,
|
||||||
|
E2: Debug,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
ResponseState::Failed(e) => f.debug_tuple("ResponseState::Failed").field(e).finish(),
|
||||||
|
ResponseState::Rx(rx) => f.debug_tuple("ResponseState::Rx").field(rx).finish(),
|
||||||
|
ResponseState::Poll(fut) => f.debug_tuple("ResponseState::Pool").field(fut).finish(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, E2, Response> ResponseFuture<S, E2, Response>
|
||||||
|
where
|
||||||
|
S: crate::Service<Response>,
|
||||||
|
{
|
||||||
|
pub(crate) fn new(rx: message::Rx<S::Future, S::Error>) -> Self {
|
||||||
ResponseFuture {
|
ResponseFuture {
|
||||||
state: ResponseState::Rx(rx),
|
state: ResponseState::Rx(rx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn failed(err: crate::BoxError) -> Self {
|
pub(crate) fn failed(err: E2) -> Self {
|
||||||
ResponseFuture {
|
ResponseFuture {
|
||||||
state: ResponseState::Failed(Some(err)),
|
state: ResponseState::Failed(Some(err)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, T, E> Future for ResponseFuture<F>
|
impl<S, E2, Response> Future for ResponseFuture<S, E2, Response>
|
||||||
where
|
where
|
||||||
F: Future<Output = Result<T, E>>,
|
S: crate::Service<Response>,
|
||||||
E: Into<crate::BoxError>,
|
S::Future: Future<Output = Result<S::Response, S::Error>>,
|
||||||
|
S::Error: Into<E2>,
|
||||||
|
crate::buffer::error::Closed: Into<E2>,
|
||||||
{
|
{
|
||||||
type Output = Result<T, crate::BoxError>;
|
type Output = Result<S::Response, E2>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let mut this = self.project();
|
let mut this = self.project();
|
||||||
|
|
|
@ -9,12 +9,13 @@ use tower_service::Service;
|
||||||
/// which means that this layer can only be used on the Tokio runtime.
|
/// which means that this layer can only be used on the Tokio runtime.
|
||||||
///
|
///
|
||||||
/// See the module documentation for more details.
|
/// See the module documentation for more details.
|
||||||
pub struct BufferLayer<Request> {
|
pub struct BufferLayer<Request, E2> {
|
||||||
bound: usize,
|
bound: usize,
|
||||||
_p: PhantomData<fn(Request)>,
|
_p: PhantomData<fn(Request)>,
|
||||||
|
_e: PhantomData<E2>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Request> BufferLayer<Request> {
|
impl<Request, E2> BufferLayer<Request, E2> {
|
||||||
/// Creates a new `BufferLayer` with the provided `bound`.
|
/// Creates a new `BufferLayer` with the provided `bound`.
|
||||||
///
|
///
|
||||||
/// `bound` gives the maximal number of requests that can be queued for the service before
|
/// `bound` gives the maximal number of requests that can be queued for the service before
|
||||||
|
@ -33,25 +34,29 @@ impl<Request> BufferLayer<Request> {
|
||||||
BufferLayer {
|
BufferLayer {
|
||||||
bound,
|
bound,
|
||||||
_p: PhantomData,
|
_p: PhantomData,
|
||||||
|
_e: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, Request> Layer<S> for BufferLayer<Request>
|
impl<S, Request, E2> Layer<S> for BufferLayer<Request, E2>
|
||||||
where
|
where
|
||||||
S: Service<Request> + Send + 'static,
|
S: Service<Request> + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
S::Error: Into<crate::BoxError> + Send + Sync,
|
S::Error: Clone + Into<E2> + Send + Sync + std::fmt::Display,
|
||||||
|
Request: Send + 'static,
|
||||||
|
E2: Send + 'static,
|
||||||
|
crate::buffer::error::Closed: Into<E2>,
|
||||||
Request: Send + 'static,
|
Request: Send + 'static,
|
||||||
{
|
{
|
||||||
type Service = Buffer<S, Request>;
|
type Service = Buffer<S, Request, E2>;
|
||||||
|
|
||||||
fn layer(&self, service: S) -> Self::Service {
|
fn layer(&self, service: S) -> Self::Service {
|
||||||
Buffer::new(service, self.bound)
|
Buffer::new(service, self.bound)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Request> fmt::Debug for BufferLayer<Request> {
|
impl<Request, E2> fmt::Debug for BufferLayer<Request, E2> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
f.debug_struct("BufferLayer")
|
f.debug_struct("BufferLayer")
|
||||||
.field("bound", &self.bound)
|
.field("bound", &self.bound)
|
||||||
|
|
|
@ -1,16 +1,15 @@
|
||||||
use super::error::ServiceError;
|
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
/// Message sent over buffer
|
/// Message sent over buffer
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct Message<Request, Fut> {
|
pub(crate) struct Message<Request, Fut, E> {
|
||||||
pub(crate) request: Request,
|
pub(crate) request: Request,
|
||||||
pub(crate) tx: Tx<Fut>,
|
pub(crate) tx: Tx<Fut, E>,
|
||||||
pub(crate) span: tracing::Span,
|
pub(crate) span: tracing::Span,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Response sender
|
/// Response sender
|
||||||
pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>;
|
pub(crate) type Tx<Fut, E> = oneshot::Sender<Result<Fut, E>>;
|
||||||
|
|
||||||
/// Response receiver
|
/// Response receiver
|
||||||
pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>;
|
pub(crate) type Rx<Fut, E> = oneshot::Receiver<Result<Fut, E>>;
|
||||||
|
|
|
@ -13,18 +13,20 @@ use tower_service::Service;
|
||||||
///
|
///
|
||||||
/// See the module documentation for more details.
|
/// See the module documentation for more details.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Buffer<T, Request>
|
pub struct Buffer<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
{
|
{
|
||||||
tx: mpsc::Sender<Message<Request, T::Future>>,
|
tx: mpsc::Sender<Message<Request, S::Future, S::Error>>,
|
||||||
handle: Handle,
|
handle: Handle<S::Error, E2>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, Request> Buffer<T, Request>
|
impl<S, Request, E2> Buffer<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
T::Error: Into<crate::BoxError>,
|
S::Error: Into<E2> + Clone,
|
||||||
|
E2: Send + 'static,
|
||||||
|
crate::buffer::error::Closed: Into<E2>,
|
||||||
{
|
{
|
||||||
/// Creates a new `Buffer` wrapping `service`.
|
/// Creates a new `Buffer` wrapping `service`.
|
||||||
///
|
///
|
||||||
|
@ -43,11 +45,11 @@ where
|
||||||
/// If you do not, all the slots in the buffer may be held up by futures that have just called
|
/// If you do not, all the slots in the buffer may be held up by futures that have just called
|
||||||
/// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new
|
/// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new
|
||||||
/// requests.
|
/// requests.
|
||||||
pub fn new(service: T, bound: usize) -> Self
|
pub fn new(service: S, bound: usize) -> Self
|
||||||
where
|
where
|
||||||
T: Send + 'static,
|
S: Send + 'static,
|
||||||
T::Future: Send,
|
S::Future: Send,
|
||||||
T::Error: Send + Sync,
|
S::Error: Send + Sync + std::fmt::Display,
|
||||||
Request: Send + 'static,
|
Request: Send + 'static,
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::channel(bound);
|
let (tx, rx) = mpsc::channel(bound);
|
||||||
|
@ -61,10 +63,10 @@ where
|
||||||
/// This is useful if you do not want to spawn directly onto the `tokio` runtime
|
/// This is useful if you do not want to spawn directly onto the `tokio` runtime
|
||||||
/// but instead want to use your own executor. This will return the `Buffer` and
|
/// but instead want to use your own executor. This will return the `Buffer` and
|
||||||
/// the background `Worker` that you can then spawn.
|
/// the background `Worker` that you can then spawn.
|
||||||
pub fn pair(service: T, bound: usize) -> (Buffer<T, Request>, Worker<T, Request>)
|
pub fn pair(service: S, bound: usize) -> (Buffer<S, Request, E2>, Worker<S, Request, E2>)
|
||||||
where
|
where
|
||||||
T: Send + 'static,
|
S: Send + 'static,
|
||||||
T::Error: Send + Sync,
|
S::Error: Send + Sync,
|
||||||
Request: Send + 'static,
|
Request: Send + 'static,
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::channel(bound);
|
let (tx, rx) = mpsc::channel(bound);
|
||||||
|
@ -72,23 +74,25 @@ where
|
||||||
(Buffer { tx, handle }, worker)
|
(Buffer { tx, handle }, worker)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_worker_error(&self) -> crate::BoxError {
|
fn get_worker_error(&self) -> E2 {
|
||||||
self.handle.get_error_on_closed()
|
self.handle.get_error_on_closed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, Request> Service<Request> for Buffer<T, Request>
|
impl<S, Request, E2> Service<Request> for Buffer<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
T::Error: Into<crate::BoxError>,
|
crate::buffer::error::Closed: Into<E2>,
|
||||||
|
S::Error: Into<E2> + Clone,
|
||||||
|
E2: Send + 'static,
|
||||||
{
|
{
|
||||||
type Response = T::Response;
|
type Response = S::Response;
|
||||||
type Error = crate::BoxError;
|
type Error = E2;
|
||||||
type Future = ResponseFuture<T::Future>;
|
type Future = ResponseFuture<S, E2, Request>;
|
||||||
|
|
||||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
// If the inner service has errored, then we error here.
|
// If the inner service has errored, then we error here.
|
||||||
if let Err(_) = ready!(self.tx.poll_ready(cx)) {
|
if ready!(self.tx.poll_ready(cx)).is_err() {
|
||||||
Poll::Ready(Err(self.get_worker_error()))
|
Poll::Ready(Err(self.get_worker_error()))
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
|
@ -126,9 +130,9 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, Request> Clone for Buffer<T, Request>
|
impl<S, Request, E2> Clone for Buffer<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
use super::{
|
use super::{error::Closed, message::Message};
|
||||||
error::{Closed, ServiceError},
|
|
||||||
message::Message,
|
|
||||||
};
|
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
|
marker::PhantomData,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
@ -22,36 +20,38 @@ use tower_service::Service;
|
||||||
/// implement (only call).
|
/// implement (only call).
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Worker<T, Request>
|
pub struct Worker<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
T::Error: Into<crate::BoxError>,
|
S::Error: Into<E2>,
|
||||||
{
|
{
|
||||||
current_message: Option<Message<Request, T::Future>>,
|
current_message: Option<Message<Request, S::Future, S::Error>>,
|
||||||
rx: mpsc::Receiver<Message<Request, T::Future>>,
|
rx: mpsc::Receiver<Message<Request, S::Future, S::Error>>,
|
||||||
service: T,
|
service: S,
|
||||||
finish: bool,
|
finish: bool,
|
||||||
failed: Option<ServiceError>,
|
failed: Option<S::Error>,
|
||||||
handle: Handle,
|
handle: Handle<S::Error, E2>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the error out
|
/// Get the error out
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct Handle {
|
pub(crate) struct Handle<E, E2> {
|
||||||
inner: Arc<Mutex<Option<ServiceError>>>,
|
inner: Arc<Mutex<Option<E>>>,
|
||||||
|
_e: PhantomData<E2>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, Request> Worker<T, Request>
|
impl<S, Request, E2> Worker<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
T::Error: Into<crate::BoxError>,
|
S::Error: Into<E2> + Clone,
|
||||||
{
|
{
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
service: T,
|
service: S,
|
||||||
rx: mpsc::Receiver<Message<Request, T::Future>>,
|
rx: mpsc::Receiver<Message<Request, S::Future, S::Error>>,
|
||||||
) -> (Handle, Worker<T, Request>) {
|
) -> (Handle<S::Error, E2>, Worker<S, Request, E2>) {
|
||||||
let handle = Handle {
|
let handle = Handle {
|
||||||
inner: Arc::new(Mutex::new(None)),
|
inner: Arc::new(Mutex::new(None)),
|
||||||
|
_e: PhantomData,
|
||||||
};
|
};
|
||||||
|
|
||||||
let worker = Worker {
|
let worker = Worker {
|
||||||
|
@ -70,10 +70,11 @@ where
|
||||||
///
|
///
|
||||||
/// If a `Message` is returned, the `bool` is true if this is the first time we received this
|
/// If a `Message` is returned, the `bool` is true if this is the first time we received this
|
||||||
/// message, and false otherwise (i.e., we tried to forward it to the backing service before).
|
/// message, and false otherwise (i.e., we tried to forward it to the backing service before).
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
fn poll_next_msg(
|
fn poll_next_msg(
|
||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<(Message<Request, T::Future>, bool)>> {
|
) -> Poll<Option<(Message<Request, S::Future, S::Error>, bool)>> {
|
||||||
if self.finish {
|
if self.finish {
|
||||||
// We've already received None and are shutting down
|
// We've already received None and are shutting down
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
|
@ -105,7 +106,7 @@ where
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn failed(&mut self, error: crate::BoxError) {
|
fn failed(&mut self, error: S::Error) {
|
||||||
// The underlying service failed when we called `poll_ready` on it with the given `error`. We
|
// The underlying service failed when we called `poll_ready` on it with the given `error`. We
|
||||||
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
|
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
|
||||||
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
|
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
|
||||||
|
@ -118,8 +119,6 @@ where
|
||||||
// request. We do this by *first* exposing the error, *then* closing the channel used to
|
// request. We do this by *first* exposing the error, *then* closing the channel used to
|
||||||
// send more requests (so the client will see the error when the send fails), and *then*
|
// send more requests (so the client will see the error when the send fails), and *then*
|
||||||
// sending the error to all outstanding requests.
|
// sending the error to all outstanding requests.
|
||||||
let error = ServiceError::new(error);
|
|
||||||
|
|
||||||
let mut inner = self.handle.inner.lock().unwrap();
|
let mut inner = self.handle.inner.lock().unwrap();
|
||||||
|
|
||||||
if inner.is_some() {
|
if inner.is_some() {
|
||||||
|
@ -139,10 +138,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, Request> Future for Worker<T, Request>
|
impl<S, Request, E2> Future for Worker<S, Request, E2>
|
||||||
where
|
where
|
||||||
T: Service<Request>,
|
S: Service<Request>,
|
||||||
T::Error: Into<crate::BoxError>,
|
S::Error: Into<E2> + Clone + std::fmt::Display,
|
||||||
{
|
{
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
|
@ -185,8 +184,7 @@ where
|
||||||
self.current_message = Some(msg);
|
self.current_message = Some(msg);
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(error)) => {
|
||||||
let error = e.into();
|
|
||||||
tracing::debug!({ %error }, "service failed");
|
tracing::debug!({ %error }, "service failed");
|
||||||
drop(_guard);
|
drop(_guard);
|
||||||
self.failed(error);
|
self.failed(error);
|
||||||
|
@ -208,8 +206,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handle {
|
impl<E, E2> Handle<E, E2>
|
||||||
pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
|
where
|
||||||
|
E: Clone + Into<E2>,
|
||||||
|
crate::buffer::error::Closed: Into<E2>,
|
||||||
|
{
|
||||||
|
pub(crate) fn get_error_on_closed(&self) -> E2 {
|
||||||
self.inner
|
self.inner
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -219,10 +221,11 @@ impl Handle {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for Handle {
|
impl<E, E2> Clone for Handle<E, E2> {
|
||||||
fn clone(&self) -> Handle {
|
fn clone(&self) -> Handle<E, E2> {
|
||||||
Handle {
|
Handle {
|
||||||
inner: self.inner.clone(),
|
inner: self.inner.clone(),
|
||||||
|
_e: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,6 +106,7 @@ pub struct ServiceBuilder<L> {
|
||||||
layer: L,
|
layer: L,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::new_without_default)]
|
||||||
impl ServiceBuilder<Identity> {
|
impl ServiceBuilder<Identity> {
|
||||||
/// Create a new `ServiceBuilder`.
|
/// Create a new `ServiceBuilder`.
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
|
@ -125,10 +126,10 @@ impl<L> ServiceBuilder<L> {
|
||||||
|
|
||||||
/// Buffer requests when when the next layer is out of capacity.
|
/// Buffer requests when when the next layer is out of capacity.
|
||||||
#[cfg(feature = "buffer")]
|
#[cfg(feature = "buffer")]
|
||||||
pub fn buffer<Request>(
|
pub fn buffer<Request, E2>(
|
||||||
self,
|
self,
|
||||||
bound: usize,
|
bound: usize,
|
||||||
) -> ServiceBuilder<Stack<crate::buffer::BufferLayer<Request>, L>> {
|
) -> ServiceBuilder<Stack<crate::buffer::BufferLayer<Request, E2>, L>> {
|
||||||
self.layer(crate::buffer::BufferLayer::new(bound))
|
self.layer(crate::buffer::BufferLayer::new(bound))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,6 +81,3 @@ pub use tower_service::Service;
|
||||||
mod sealed {
|
mod sealed {
|
||||||
pub trait Sealed<T> {}
|
pub trait Sealed<T> {}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Alias for a type-erased error type.
|
|
||||||
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
|
|
||||||
|
|
|
@ -79,8 +79,6 @@ async fn when_inner_is_not_ready() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn when_inner_fails() {
|
async fn when_inner_fails() {
|
||||||
use std::error::Error as StdError;
|
|
||||||
|
|
||||||
let (mut service, mut handle) = new_service();
|
let (mut service, mut handle) = new_service();
|
||||||
|
|
||||||
// Make the service NotReady
|
// Make the service NotReady
|
||||||
|
@ -91,13 +89,8 @@ async fn when_inner_fails() {
|
||||||
|
|
||||||
let_worker_work();
|
let_worker_work();
|
||||||
let e = assert_ready_err!(res1.poll());
|
let e = assert_ready_err!(res1.poll());
|
||||||
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
|
|
||||||
let e = e.source().unwrap();
|
|
||||||
|
|
||||||
assert_eq!(e.to_string(), "foobar");
|
assert_eq!(e.to_string(), "foobar");
|
||||||
} else {
|
|
||||||
panic!("unexpected error type: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -110,7 +103,7 @@ async fn poll_ready_when_worker_is_dropped_early() {
|
||||||
|
|
||||||
drop(worker);
|
drop(worker);
|
||||||
|
|
||||||
let err = assert_ready_err!(service.poll_ready());
|
let err: mock::error::Error = assert_ready_err!(service.poll_ready());
|
||||||
|
|
||||||
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
|
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
|
||||||
}
|
}
|
||||||
|
@ -130,14 +123,17 @@ async fn response_future_when_worker_is_dropped_early() {
|
||||||
drop(worker);
|
drop(worker);
|
||||||
|
|
||||||
let_worker_work();
|
let_worker_work();
|
||||||
let err = assert_ready_err!(response.poll());
|
let err: mock::error::Error = assert_ready_err!(response.poll());
|
||||||
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
|
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
type Mock = mock::Mock<&'static str, &'static str>;
|
type Mock = mock::Mock<&'static str, &'static str>;
|
||||||
type Handle = mock::Handle<&'static str, &'static str>;
|
type Handle = mock::Handle<&'static str, &'static str>;
|
||||||
|
|
||||||
fn new_service() -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle) {
|
fn new_service() -> (
|
||||||
|
mock::Spawn<Buffer<Mock, &'static str, mock::error::Error>>,
|
||||||
|
Handle,
|
||||||
|
) {
|
||||||
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests
|
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests
|
||||||
mock::spawn_with(|s| {
|
mock::spawn_with(|s| {
|
||||||
let (svc, worker) = Buffer::pair(s, 10);
|
let (svc, worker) = Buffer::pair(s, 10);
|
||||||
|
|
Loading…
Reference in New Issue