From 2c3a500c1063f18dc745e753bf79ef1a88cb935d Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Wed, 24 Jun 2020 12:02:08 -0700 Subject: [PATCH] port tower-buffer over to generic error approach --- tower-test/src/mock/error.rs | 33 ++++- tower/Cargo.toml | 2 +- tower/examples/tower-balance.rs | 226 -------------------------------- tower/src/buffer/error.rs | 37 +----- tower/src/buffer/future.rs | 70 +++++++--- tower/src/buffer/layer.rs | 17 ++- tower/src/buffer/message.rs | 9 +- tower/src/buffer/service.rs | 52 ++++---- tower/src/buffer/worker.rs | 69 +++++----- tower/src/builder/mod.rs | 5 +- tower/src/lib.rs | 3 - tower/tests/buffer/main.rs | 18 +-- 12 files changed, 178 insertions(+), 363 deletions(-) delete mode 100644 tower/examples/tower-balance.rs diff --git a/tower-test/src/mock/error.rs b/tower-test/src/mock/error.rs index 84f1815..9ffdafd 100644 --- a/tower-test/src/mock/error.rs +++ b/tower-test/src/mock/error.rs @@ -2,7 +2,38 @@ use std::{error, fmt}; -pub(crate) type Error = Box; +// pub(crate) type Error = Box; + +/// Cloneable dyn Error +#[derive(Debug, Clone)] +pub struct Error { + inner: std::sync::Arc, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl From for Error +where + E: Into>, +{ + fn from(error: E) -> Self { + let boxed = error.into(); + let inner = boxed.into(); + Self { inner } + } +} + +impl std::ops::Deref for Error { + type Target = dyn std::error::Error + Send + Sync + 'static; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} /// Error yielded when a mocked service does not yet accept requests. #[derive(Debug)] diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 534280a..7eedd0a 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -24,7 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures", "service"] edition = "2018" [features] -default = ["log"] +default = ["log", "buffer"] log = ["tracing/log"] balance = ["discover", "load", "ready-cache", "make", "rand", "slab"] buffer = ["tokio/sync", "tokio/rt-core"] diff --git a/tower/examples/tower-balance.rs b/tower/examples/tower-balance.rs deleted file mode 100644 index 649cbeb..0000000 --- a/tower/examples/tower-balance.rs +++ /dev/null @@ -1,226 +0,0 @@ -//! Exercises load balancers with mocked services. - -use futures_core::{Stream, TryStream}; -use futures_util::{stream, stream::StreamExt, stream::TryStreamExt}; -use hdrhistogram::Histogram; -use pin_project::pin_project; -use rand::{self, Rng}; -use std::hash::Hash; -use std::time::Duration; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; -use tokio::time::{self, Instant}; -use tower::balance as lb; -use tower::discover::{Change, Discover}; -use tower::limit::concurrency::ConcurrencyLimit; -use tower::load; -use tower::util::ServiceExt; -use tower_service::Service; - -const REQUESTS: usize = 100_000; -const CONCURRENCY: usize = 500; -const DEFAULT_RTT: Duration = Duration::from_millis(30); -static ENDPOINT_CAPACITY: usize = CONCURRENCY; -static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [ - Duration::from_millis(1), - Duration::from_millis(5), - Duration::from_millis(10), - Duration::from_millis(10), - Duration::from_millis(10), - Duration::from_millis(100), - Duration::from_millis(100), - Duration::from_millis(100), - Duration::from_millis(500), - Duration::from_millis(1000), -]; - -struct Summary { - latencies: Histogram, - start: Instant, - count_by_instance: [usize; 10], -} - -#[tokio::main] -async fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap(); - - println!("REQUESTS={}", REQUESTS); - println!("CONCURRENCY={}", CONCURRENCY); - println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY); - print!("MAX_ENDPOINT_LATENCIES=["); - for max in &MAX_ENDPOINT_LATENCIES { - let l = max.as_secs() * 1_000 + u64::from(max.subsec_nanos() / 1_000 / 1_000); - print!("{}ms, ", l); - } - println!("]"); - - let decay = Duration::from_secs(10); - let d = gen_disco(); - let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new( - d, - DEFAULT_RTT, - decay, - load::CompleteOnResponse::default(), - )); - run("P2C+PeakEWMA...", pe).await; - - let d = gen_disco(); - let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new( - d, - load::CompleteOnResponse::default(), - )); - run("P2C+LeastLoaded...", ll).await; -} - -type Error = Box; - -type Key = usize; - -#[pin_project] -struct Disco(Vec<(Key, S)>); - -impl Stream for Disco -where - S: Service, -{ - type Item = Result, Error>; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - match self.project().0.pop() { - Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))), - None => { - // there may be more later - Poll::Pending - } - } - } -} - -fn gen_disco() -> impl Discover< - Key = Key, - Error = Error, - Service = ConcurrencyLimit< - impl Service + Send, - >, -> + Send { - Disco( - MAX_ENDPOINT_LATENCIES - .iter() - .enumerate() - .map(|(instance, latency)| { - let svc = tower::service_fn(move |_| { - let start = Instant::now(); - - let maxms = u64::from(latency.subsec_nanos() / 1_000 / 1_000) - .saturating_add(latency.as_secs().saturating_mul(1_000)); - let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms)); - - async move { - time::delay_until(start + latency).await; - let latency = start.elapsed(); - Ok(Rsp { latency, instance }) - } - }); - - (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY)) - }) - .collect(), - ) -} - -async fn run(name: &'static str, lb: lb::p2c::Balance) -where - D: Discover + Unpin + Send + 'static, - D::Error: Into, - D::Key: Clone + Send + Hash, - D::Service: Service + load::Load + Send, - >::Error: Into, - >::Future: Send, - ::Metric: std::fmt::Debug, -{ - println!("{}", name); - - let requests = stream::repeat(Req).take(REQUESTS); - let service = ConcurrencyLimit::new(lb, CONCURRENCY); - let responses = service.call_all(requests).unordered(); - - compute_histo(responses).await.unwrap().report(); -} - -async fn compute_histo(mut times: S) -> Result -where - S: TryStream + 'static + Unpin, -{ - let mut summary = Summary::new(); - while let Some(rsp) = times.try_next().await? { - summary.count(rsp); - } - Ok(summary) -} - -impl Summary { - fn new() -> Self { - Self { - // The max delay is 2000ms. At 3 significant figures. - latencies: Histogram::::new_with_max(3_000, 3).unwrap(), - start: Instant::now(), - count_by_instance: [0; 10], - } - } - - fn count(&mut self, rsp: Rsp) { - let ms = rsp.latency.as_secs() * 1_000; - let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000; - self.latencies += ms; - self.count_by_instance[rsp.instance] += 1; - } - - fn report(&self) { - let mut total = 0; - for c in &self.count_by_instance { - total += c; - } - for (i, c) in self.count_by_instance.iter().enumerate() { - let p = *c as f64 / total as f64 * 100.0; - println!(" [{:02}] {:>5.01}%", i, p); - } - - println!(" wall {:4}s", self.start.elapsed().as_secs()); - - if self.latencies.len() < 2 { - return; - } - println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5)); - - if self.latencies.len() < 10 { - return; - } - println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9)); - - if self.latencies.len() < 50 { - return; - } - println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95)); - - if self.latencies.len() < 100 { - return; - } - println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99)); - - if self.latencies.len() < 1000 { - return; - } - println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999)); - } -} - -#[derive(Debug, Clone)] -struct Req; - -#[derive(Debug)] -struct Rsp { - latency: Duration, - instance: usize, -} diff --git a/tower/src/buffer/error.rs b/tower/src/buffer/error.rs index f875390..0528002 100644 --- a/tower/src/buffer/error.rs +++ b/tower/src/buffer/error.rs @@ -1,47 +1,12 @@ //! Error types for the `Buffer` middleware. -use crate::BoxError; -use std::{fmt, sync::Arc}; - -/// An error produced by a `Service` wrapped by a `Buffer` -#[derive(Debug)] -pub struct ServiceError { - inner: Arc, -} +use std::fmt; /// An error produced when the a buffer's worker closes unexpectedly. pub struct Closed { _p: (), } -// ===== impl ServiceError ===== - -impl ServiceError { - pub(crate) fn new(inner: BoxError) -> ServiceError { - let inner = Arc::new(inner); - ServiceError { inner } - } - - // Private to avoid exposing `Clone` trait as part of the public API - pub(crate) fn clone(&self) -> ServiceError { - ServiceError { - inner: self.inner.clone(), - } - } -} - -impl fmt::Display for ServiceError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "buffered service failed: {}", self.inner) - } -} - -impl std::error::Error for ServiceError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&**self.inner) - } -} - // ===== impl Closed ===== impl Closed { diff --git a/tower/src/buffer/future.rs b/tower/src/buffer/future.rs index 6aeff36..5d15006 100644 --- a/tower/src/buffer/future.rs +++ b/tower/src/buffer/future.rs @@ -4,6 +4,7 @@ use super::{error::Closed, message}; use futures_core::ready; use pin_project::pin_project; use std::{ + fmt::Debug, future::Future, pin::Pin, task::{Context, Poll}, @@ -11,40 +12,79 @@ use std::{ /// Future that completes when the buffered service eventually services the submitted request. #[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { +pub struct ResponseFuture +where + S: crate::Service, +{ #[pin] - state: ResponseState, + state: ResponseState, +} + +impl Debug for ResponseFuture +where + S: crate::Service, + S::Future: Debug, + S::Error: Debug, + E2: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ResponseFuture") + .field("state", &self.state) + .finish() + } } #[pin_project(project = ResponseStateProj)] -#[derive(Debug)] -enum ResponseState { - Failed(Option), - Rx(#[pin] message::Rx), - Poll(#[pin] T), +enum ResponseState +where + S: crate::Service, +{ + Failed(Option), + Rx(#[pin] message::Rx), + Poll(#[pin] S::Future), } -impl ResponseFuture { - pub(crate) fn new(rx: message::Rx) -> Self { +impl Debug for ResponseState +where + S: crate::Service, + S::Future: Debug, + S::Error: Debug, + E2: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ResponseState::Failed(e) => f.debug_tuple("ResponseState::Failed").field(e).finish(), + ResponseState::Rx(rx) => f.debug_tuple("ResponseState::Rx").field(rx).finish(), + ResponseState::Poll(fut) => f.debug_tuple("ResponseState::Pool").field(fut).finish(), + } + } +} + +impl ResponseFuture +where + S: crate::Service, +{ + pub(crate) fn new(rx: message::Rx) -> Self { ResponseFuture { state: ResponseState::Rx(rx), } } - pub(crate) fn failed(err: crate::BoxError) -> Self { + pub(crate) fn failed(err: E2) -> Self { ResponseFuture { state: ResponseState::Failed(Some(err)), } } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - F: Future>, - E: Into, + S: crate::Service, + S::Future: Future>, + S::Error: Into, + crate::buffer::error::Closed: Into, { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); diff --git a/tower/src/buffer/layer.rs b/tower/src/buffer/layer.rs index 5bddc92..0f6a8c5 100644 --- a/tower/src/buffer/layer.rs +++ b/tower/src/buffer/layer.rs @@ -9,12 +9,13 @@ use tower_service::Service; /// which means that this layer can only be used on the Tokio runtime. /// /// See the module documentation for more details. -pub struct BufferLayer { +pub struct BufferLayer { bound: usize, _p: PhantomData, + _e: PhantomData, } -impl BufferLayer { +impl BufferLayer { /// Creates a new `BufferLayer` with the provided `bound`. /// /// `bound` gives the maximal number of requests that can be queued for the service before @@ -33,25 +34,29 @@ impl BufferLayer { BufferLayer { bound, _p: PhantomData, + _e: PhantomData, } } } -impl Layer for BufferLayer +impl Layer for BufferLayer where S: Service + Send + 'static, S::Future: Send, - S::Error: Into + Send + Sync, + S::Error: Clone + Into + Send + Sync + std::fmt::Display, + Request: Send + 'static, + E2: Send + 'static, + crate::buffer::error::Closed: Into, Request: Send + 'static, { - type Service = Buffer; + type Service = Buffer; fn layer(&self, service: S) -> Self::Service { Buffer::new(service, self.bound) } } -impl fmt::Debug for BufferLayer { +impl fmt::Debug for BufferLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BufferLayer") .field("bound", &self.bound) diff --git a/tower/src/buffer/message.rs b/tower/src/buffer/message.rs index 6d13aa1..ba9ae7f 100644 --- a/tower/src/buffer/message.rs +++ b/tower/src/buffer/message.rs @@ -1,16 +1,15 @@ -use super::error::ServiceError; use tokio::sync::oneshot; /// Message sent over buffer #[derive(Debug)] -pub(crate) struct Message { +pub(crate) struct Message { pub(crate) request: Request, - pub(crate) tx: Tx, + pub(crate) tx: Tx, pub(crate) span: tracing::Span, } /// Response sender -pub(crate) type Tx = oneshot::Sender>; +pub(crate) type Tx = oneshot::Sender>; /// Response receiver -pub(crate) type Rx = oneshot::Receiver>; +pub(crate) type Rx = oneshot::Receiver>; diff --git a/tower/src/buffer/service.rs b/tower/src/buffer/service.rs index ce315ff..19c29cf 100644 --- a/tower/src/buffer/service.rs +++ b/tower/src/buffer/service.rs @@ -13,18 +13,20 @@ use tower_service::Service; /// /// See the module documentation for more details. #[derive(Debug)] -pub struct Buffer +pub struct Buffer where - T: Service, + S: Service, { - tx: mpsc::Sender>, - handle: Handle, + tx: mpsc::Sender>, + handle: Handle, } -impl Buffer +impl Buffer where - T: Service, - T::Error: Into, + S: Service, + S::Error: Into + Clone, + E2: Send + 'static, + crate::buffer::error::Closed: Into, { /// Creates a new `Buffer` wrapping `service`. /// @@ -43,11 +45,11 @@ where /// If you do not, all the slots in the buffer may be held up by futures that have just called /// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new /// requests. - pub fn new(service: T, bound: usize) -> Self + pub fn new(service: S, bound: usize) -> Self where - T: Send + 'static, - T::Future: Send, - T::Error: Send + Sync, + S: Send + 'static, + S::Future: Send, + S::Error: Send + Sync + std::fmt::Display, Request: Send + 'static, { let (tx, rx) = mpsc::channel(bound); @@ -61,10 +63,10 @@ where /// This is useful if you do not want to spawn directly onto the `tokio` runtime /// but instead want to use your own executor. This will return the `Buffer` and /// the background `Worker` that you can then spawn. - pub fn pair(service: T, bound: usize) -> (Buffer, Worker) + pub fn pair(service: S, bound: usize) -> (Buffer, Worker) where - T: Send + 'static, - T::Error: Send + Sync, + S: Send + 'static, + S::Error: Send + Sync, Request: Send + 'static, { let (tx, rx) = mpsc::channel(bound); @@ -72,23 +74,25 @@ where (Buffer { tx, handle }, worker) } - fn get_worker_error(&self) -> crate::BoxError { + fn get_worker_error(&self) -> E2 { self.handle.get_error_on_closed() } } -impl Service for Buffer +impl Service for Buffer where - T: Service, - T::Error: Into, + S: Service, + crate::buffer::error::Closed: Into, + S::Error: Into + Clone, + E2: Send + 'static, { - type Response = T::Response; - type Error = crate::BoxError; - type Future = ResponseFuture; + type Response = S::Response; + type Error = E2; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // If the inner service has errored, then we error here. - if let Err(_) = ready!(self.tx.poll_ready(cx)) { + if ready!(self.tx.poll_ready(cx)).is_err() { Poll::Ready(Err(self.get_worker_error())) } else { Poll::Ready(Ok(())) @@ -126,9 +130,9 @@ where } } -impl Clone for Buffer +impl Clone for Buffer where - T: Service, + S: Service, { fn clone(&self) -> Self { Self { diff --git a/tower/src/buffer/worker.rs b/tower/src/buffer/worker.rs index 2c2ae10..001fce9 100644 --- a/tower/src/buffer/worker.rs +++ b/tower/src/buffer/worker.rs @@ -1,12 +1,10 @@ -use super::{ - error::{Closed, ServiceError}, - message::Message, -}; +use super::{error::Closed, message::Message}; use futures_core::ready; use pin_project::pin_project; use std::sync::{Arc, Mutex}; use std::{ future::Future, + marker::PhantomData, pin::Pin, task::{Context, Poll}, }; @@ -22,36 +20,38 @@ use tower_service::Service; /// implement (only call). #[pin_project] #[derive(Debug)] -pub struct Worker +pub struct Worker where - T: Service, - T::Error: Into, + S: Service, + S::Error: Into, { - current_message: Option>, - rx: mpsc::Receiver>, - service: T, + current_message: Option>, + rx: mpsc::Receiver>, + service: S, finish: bool, - failed: Option, - handle: Handle, + failed: Option, + handle: Handle, } /// Get the error out #[derive(Debug)] -pub(crate) struct Handle { - inner: Arc>>, +pub(crate) struct Handle { + inner: Arc>>, + _e: PhantomData, } -impl Worker +impl Worker where - T: Service, - T::Error: Into, + S: Service, + S::Error: Into + Clone, { pub(crate) fn new( - service: T, - rx: mpsc::Receiver>, - ) -> (Handle, Worker) { + service: S, + rx: mpsc::Receiver>, + ) -> (Handle, Worker) { let handle = Handle { inner: Arc::new(Mutex::new(None)), + _e: PhantomData, }; let worker = Worker { @@ -70,10 +70,11 @@ where /// /// If a `Message` is returned, the `bool` is true if this is the first time we received this /// message, and false otherwise (i.e., we tried to forward it to the backing service before). + #[allow(clippy::type_complexity)] fn poll_next_msg( &mut self, cx: &mut Context<'_>, - ) -> Poll, bool)>> { + ) -> Poll, bool)>> { if self.finish { // We've already received None and are shutting down return Poll::Ready(None); @@ -105,7 +106,7 @@ where Poll::Ready(None) } - fn failed(&mut self, error: crate::BoxError) { + fn failed(&mut self, error: S::Error) { // The underlying service failed when we called `poll_ready` on it with the given `error`. We // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in // an `Arc`, send that `Arc` to all pending requests, and store it so that subsequent @@ -118,8 +119,6 @@ where // request. We do this by *first* exposing the error, *then* closing the channel used to // send more requests (so the client will see the error when the send fails), and *then* // sending the error to all outstanding requests. - let error = ServiceError::new(error); - let mut inner = self.handle.inner.lock().unwrap(); if inner.is_some() { @@ -139,10 +138,10 @@ where } } -impl Future for Worker +impl Future for Worker where - T: Service, - T::Error: Into, + S: Service, + S::Error: Into + Clone + std::fmt::Display, { type Output = (); @@ -185,8 +184,7 @@ where self.current_message = Some(msg); return Poll::Pending; } - Poll::Ready(Err(e)) => { - let error = e.into(); + Poll::Ready(Err(error)) => { tracing::debug!({ %error }, "service failed"); drop(_guard); self.failed(error); @@ -208,8 +206,12 @@ where } } -impl Handle { - pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { +impl Handle +where + E: Clone + Into, + crate::buffer::error::Closed: Into, +{ + pub(crate) fn get_error_on_closed(&self) -> E2 { self.inner .lock() .unwrap() @@ -219,10 +221,11 @@ impl Handle { } } -impl Clone for Handle { - fn clone(&self) -> Handle { +impl Clone for Handle { + fn clone(&self) -> Handle { Handle { inner: self.inner.clone(), + _e: PhantomData, } } } diff --git a/tower/src/builder/mod.rs b/tower/src/builder/mod.rs index 2061443..a99e1d5 100644 --- a/tower/src/builder/mod.rs +++ b/tower/src/builder/mod.rs @@ -106,6 +106,7 @@ pub struct ServiceBuilder { layer: L, } +#[allow(clippy::new_without_default)] impl ServiceBuilder { /// Create a new `ServiceBuilder`. pub fn new() -> Self { @@ -125,10 +126,10 @@ impl ServiceBuilder { /// Buffer requests when when the next layer is out of capacity. #[cfg(feature = "buffer")] - pub fn buffer( + pub fn buffer( self, bound: usize, - ) -> ServiceBuilder, L>> { + ) -> ServiceBuilder, L>> { self.layer(crate::buffer::BufferLayer::new(bound)) } diff --git a/tower/src/lib.rs b/tower/src/lib.rs index 3049301..8df5d01 100644 --- a/tower/src/lib.rs +++ b/tower/src/lib.rs @@ -81,6 +81,3 @@ pub use tower_service::Service; mod sealed { pub trait Sealed {} } - -/// Alias for a type-erased error type. -pub type BoxError = Box; diff --git a/tower/tests/buffer/main.rs b/tower/tests/buffer/main.rs index 5fa32c5..3e4b829 100644 --- a/tower/tests/buffer/main.rs +++ b/tower/tests/buffer/main.rs @@ -79,8 +79,6 @@ async fn when_inner_is_not_ready() { #[tokio::test] async fn when_inner_fails() { - use std::error::Error as StdError; - let (mut service, mut handle) = new_service(); // Make the service NotReady @@ -91,13 +89,8 @@ async fn when_inner_fails() { let_worker_work(); let e = assert_ready_err!(res1.poll()); - if let Some(e) = e.downcast_ref::() { - let e = e.source().unwrap(); - assert_eq!(e.to_string(), "foobar"); - } else { - panic!("unexpected error type: {:?}", e); - } + assert_eq!(e.to_string(), "foobar"); } #[tokio::test] @@ -110,7 +103,7 @@ async fn poll_ready_when_worker_is_dropped_early() { drop(worker); - let err = assert_ready_err!(service.poll_ready()); + let err: mock::error::Error = assert_ready_err!(service.poll_ready()); assert!(err.is::(), "should be a Closed: {:?}", err); } @@ -130,14 +123,17 @@ async fn response_future_when_worker_is_dropped_early() { drop(worker); let_worker_work(); - let err = assert_ready_err!(response.poll()); + let err: mock::error::Error = assert_ready_err!(response.poll()); assert!(err.is::(), "should be a Closed: {:?}", err); } type Mock = mock::Mock<&'static str, &'static str>; type Handle = mock::Handle<&'static str, &'static str>; -fn new_service() -> (mock::Spawn>, Handle) { +fn new_service() -> ( + mock::Spawn>, + Handle, +) { // bound is >0 here because clears_canceled_requests needs multiple outstanding requests mock::spawn_with(|s| { let (svc, worker) = Buffer::pair(s, 10);