Test out Service::disarm for all tower services

This commit is contained in:
Jon Gjengset 2020-04-03 10:18:23 -04:00
parent 0520a6a467
commit 3e73ffc26b
No known key found for this signature in database
GPG Key ID: 3CB1EC545A706318
35 changed files with 327 additions and 2 deletions

View File

@ -75,6 +75,10 @@ pub use self::{identity::Identity, stack::Stack};
/// println!("request = {:?}, target = {:?}", request, self.target);
/// self.service.call(request)
/// }
///
/// fn disarm(&mut self) {
/// self.service.disarm()
/// }
/// }
/// ```
///

View File

@ -79,6 +79,8 @@ use std::task::{Context, Poll};
/// // Return the response as an immediate future
/// Box::pin(fut)
/// }
///
/// fn disarm(&mut self) {}
/// }
/// ```
///
@ -234,6 +236,79 @@ pub trait Service<Request> {
/// Implementations are permitted to panic if `call` is invoked without
/// obtaining `Poll::Ready(Ok(()))` from `poll_ready`.
fn call(&mut self, req: Request) -> Self::Future;
/// Undo a successful call to `poll_ready`.
///
/// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, the service must keep capacity
/// set aside for the coming request. `disarm` allows you to give up that reserved capacity if
/// you decide you do not wish to issue a request after all. After calling `disarm`, you must
/// call `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to issue another
/// request.
///
/// Returns `false` if capacity has not been reserved for this service (usually because
/// `poll_ready` was not previously called, or did not succeed).
///
/// # Motivation
///
/// If `poll_ready` reserves part of a service's finite capacity, callers need to send an item
/// shortly after `poll_ready` succeeds. If they do not, an idle caller may take up all the
/// slots of the channel, and prevent active callers from getting any requests through.
/// Consider this code that forwards from a channel to a `BufferService` (which has limited
/// capacity):
///
/// ```rust,ignore
/// let mut fut = None;
/// loop {
/// if let Some(ref mut fut) = fut {
/// let _ = ready!(fut.poll(cx));
/// let _ = fut.take();
/// }
/// ready!(buffer.poll_ready(cx));
/// if let Some(item) = ready!(rx.poll_next(cx)) {
/// fut = Some(buffer.call(item));
/// } else {
/// break;
/// }
/// }
/// ```
///
/// If many such forwarders exist, and they all forward into a single (cloned) `BufferService`,
/// then any number of the forwarders may be waiting for `rx.next` at the same time. While they
/// do, they are effectively each reducing the buffer's capacity by 1. If enough of these
/// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
/// for them through `poll_ready`, and the system will deadlock.
///
/// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
/// you have to block. We can then fix the code above by writing:
///
/// ```rust,ignore
/// let mut fut = None;
/// loop {
/// if let Some(ref mut fut) = fut {
/// let _ = ready!(fut.poll(cx));
/// let _ = fut.take();
/// }
/// ready!(buffer.poll_ready(cx));
/// let item = rx.poll_next(cx);
/// if let Poll::Ready(Ok(_)) = item {
/// // we're going to send the item below, so don't disarm
/// } else {
/// // give up our slot, since we won't need it for a while
/// buffer.disarm();
/// }
/// if let Some(item) = ready!(item) {
/// fut = Some(buffer.call(item));
/// } else {
/// break;
/// }
/// }
/// ```
///
/// # Panics
///
/// Implementations are permitted to panic if `disarm` is invoked without
/// obtaining `Poll::Ready(Ok(()))` from `poll_ready`.
fn disarm(&mut self);
}
impl<'a, S, Request> Service<Request> for &'a mut S
@ -251,6 +326,10 @@ where
fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
}
fn disarm(&mut self) {
(**self).disarm()
}
}
impl<S, Request> Service<Request> for Box<S>
@ -268,4 +347,8 @@ where
fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
}
fn disarm(&mut self) {
(**self).disarm()
}
}

View File

@ -185,6 +185,10 @@ impl<T, U> Service<T> for Mock<T, U> {
ResponseFuture::new(rx)
}
fn disarm(&mut self) {
self.can_send = false;
}
}
impl<T, U> Clone for Mock<T, U> {

View File

@ -55,7 +55,7 @@ hdrhistogram = { version = "6.0", optional = true }
indexmap = { version = "1.0.2", optional = true }
rand = { version = "0.7", features = ["small_rng"], optional = true }
slab = { version = "0.4", optional = true }
tokio = { version = "0.2", optional = true }
tokio = { version = "0.2.15", optional = true }
[dev-dependencies]
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }

View File

@ -67,6 +67,10 @@ where
_marker: PhantomData,
}
}
fn disarm(&mut self) {
self.inner.disarm()
}
}
impl<F, T, E, Req> Future for MakeFuture<F, Req>

View File

@ -281,6 +281,14 @@ where
.call_ready_index(index, request)
.map_err(Into::into)
}
fn disarm(&mut self) {
assert_ne!(
self.services.disarm(),
0,
"called disarm when poll_ready did not succeed"
)
}
}
impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {

View File

@ -427,6 +427,10 @@ where
fn call(&mut self, req: Req) -> Self::Future {
self.balance.call(req)
}
fn disarm(&mut self) {
self.balance.disarm()
}
}
#[doc(hidden)]
@ -462,4 +466,8 @@ impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc>
fn call(&mut self, req: Request) -> Self::Future {
self.svc.call(req)
}
fn disarm(&mut self) {
self.svc.disarm()
}
}

View File

@ -121,6 +121,13 @@ where
Ok(_) => ResponseFuture::new(rx),
}
}
fn disarm(&mut self) {
assert!(
self.tx.disarm(),
"called disarm when poll_ready did not succeed"
);
}
}
impl<T, Request> Clone for Buffer<T, Request>

View File

@ -52,4 +52,8 @@ where
ResponseFuture::new(request, check, inner)
}
fn disarm(&mut self) {
self.inner.disarm()
}
}

View File

@ -74,6 +74,10 @@ where
state: State::Delaying(tokio::time::delay_until(deadline), Some(request)),
}
}
fn disarm(&mut self) {
self.service.disarm()
}
}
impl<Request, S, F, T, E> Future for ResponseFuture<Request, S, F>

View File

@ -67,6 +67,10 @@ where
inner: self.service.call(request),
}
}
fn disarm(&mut self) {
self.service.disarm()
}
}
impl<R, F, T, E> Future for ResponseFuture<R, F>

View File

@ -185,6 +185,10 @@ where
inner: self.0.call(request),
}
}
fn disarm(&mut self) {
self.0.disarm()
}
}
impl<S, Request> std::future::Future for Future<S, Request>

View File

@ -77,6 +77,12 @@ where
b_fut,
}
}
fn disarm(&mut self) {
// poll_ready only succeeds when _both_ services are ready, so disarming both is fine
self.a.disarm();
self.b.disarm();
}
}
impl<AF, BF, T, AE, BE> Future for ResponseFuture<AF, BF>

View File

@ -84,6 +84,16 @@ where
ResponseFuture::new(future, self.limit.semaphore.clone())
}
fn disarm(&mut self) {
if self.limit.permit.is_acquired() {
// NOTE: even in this case there is a chance the user did not get Ready from poll_ready
// but we did what we could to check early!
self.inner.disarm()
} else {
panic!("poll_ready did not succeed, so cannot disarm");
}
}
}
#[cfg(feature = "load")]

View File

@ -107,6 +107,14 @@ where
State::Limited(..) => panic!("service not ready; poll_ready must be called first"),
}
}
fn disarm(&mut self) {
if let State::Ready { .. } = self.state {
self.inner.disarm()
} else {
panic!("poll_ready did not succeed, so cannot disarm");
}
}
}
#[cfg(feature = "load")]

View File

@ -52,6 +52,10 @@ where
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
fn disarm(&mut self) {
self.inner.disarm()
}
}
/// Proxies `Discover` such that all changes are wrapped with a constant load.

View File

@ -182,6 +182,10 @@ where
self.service.call(req),
)
}
fn disarm(&mut self) {
self.service.disarm()
}
}
impl<S, I> Load for PeakEwma<S, I> {
@ -328,6 +332,8 @@ mod tests {
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}
fn disarm(&mut self) {}
}
/// The default RTT estimate decays, so that new nodes are considered if the

View File

@ -87,6 +87,10 @@ where
self.service.call(req),
)
}
fn disarm(&mut self) {
self.service.disarm()
}
}
// ===== impl PendingRequestsDiscover =====
@ -159,6 +163,8 @@ mod tests {
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}
fn disarm(&mut self) {}
}
#[test]

View File

@ -61,6 +61,15 @@ where
ResponseFuture::overloaded()
}
}
fn disarm(&mut self) {
if self.is_ready {
self.inner.disarm()
} else {
// we do not panic here since the user may still have called poll_ready
// and gotten a succeess.
}
}
}
impl<S: Clone> Clone for LoadShed<S> {

View File

@ -367,6 +367,35 @@ where
fut
}
/// Call `Service::disarm` on all ready services and return them to the not-ready set.
///
/// Returns the number of services disarmed this way.
///
/// Note that `ReadyCache` eagerly polls services (one call to `poll_ready` may ready several
/// services), so a call to disarm may disarm a large number of underlying services.
pub fn disarm(&mut self) -> usize {
let were_ready = self.ready.len();
// Take self.ready so we can call &mut self methods below
let mut ready = std::mem::take(&mut self.ready);
for (key, (mut svc, cancel)) in ready.drain(..) {
// If a new version of this service has been added to the
// unready set, don't overwrite it.
if !self.pending_contains(&key) {
// Disarm the once-ready service and mark it as pending
svc.disarm();
self.push_pending(key, svc, cancel);
}
}
// Restore the original IndexMap to preserve its allocation
std::mem::swap(&mut self.ready, &mut ready);
// Sanity check that the &mut self methods above didn't add stuff to the ready set
debug_assert!(ready.is_empty());
were_ready
}
}
// === Pending ===

View File

@ -146,6 +146,15 @@ where
let fut = service.call(request);
ResponseFuture::new(fut)
}
fn disarm(&mut self) {
// we must be in State::Connected if poll_ready succeeded
if let State::Connected(ref mut service) = self.state {
service.disarm()
} else {
panic!("poll_ready did not succeed, so cannot disarm");
}
}
}
impl<M, Target> fmt::Debug for Reconnect<M, Target>

View File

@ -55,4 +55,8 @@ where
ResponseFuture::new(cloned, self.clone(), future)
}
fn disarm(&mut self) {
self.service.disarm()
}
}

View File

@ -46,6 +46,10 @@ where
inner: self.inner.call(target),
}
}
fn disarm(&mut self) {
self.inner.disarm()
}
}
impl<F, T, E> Future for MakeFuture<F>

View File

@ -70,4 +70,12 @@ where
_ => unreachable!("poll_ready must be called"),
}
}
fn disarm(&mut self) {
if let Inner::Service(ref mut svc) = self.inner {
svc.as_mut().expect("illegal state").disarm()
} else {
panic!("poll_ready did not succeed, so cannot disarm");
}
}
}

View File

@ -22,6 +22,8 @@
//! println!("{}: {}", self.0, req);
//! ready(Ok(()))
//! }
//!
//! fn disarm(&mut self) {}
//! }
//!
//! #[tokio::main]
@ -134,4 +136,32 @@ where
self.not_ready.push_back(idx);
cl.call(req)
}
fn disarm(&mut self) {
// NOTE: we are not allowed to disarm a service that has not return success from
// `poll_ready`, since `disarm` implementations are allowed to panic in that case!
let not_ready: std::collections::HashSet<_> = self.not_ready.iter().cloned().collect();
for (idx, service) in self.services.iter_mut().enumerate() {
if !not_ready.contains(&idx) {
// service is ready, so disarm it
service.disarm();
self.not_ready.push_back(idx);
}
}
// TODO: it'd be great to be cleverer here and avoid the allocation
// we could want to pick a different strategy depending on whether most services are ready.
// with https://github.com/rust-lang/rust/pull/69425 we could do this by sorting not_ready:
/*
let not_ready = self.not_ready.make_contiguous();
not_ready.sort();
let mut at = 0;
for &mut not_ready_idx in not_ready {
for ready_idx in at..not_ready_idx {
self.services[ready_idx].disarm();
}
at = not_ready_idx + 1;
}
*/
}
}

View File

@ -52,4 +52,8 @@ where
ResponseFuture::new(response, sleep)
}
fn disarm(&mut self) {
self.inner.disarm()
}
}

View File

@ -53,6 +53,10 @@ impl<T, U, E> Service<T> for BoxService<T, U, E> {
fn call(&mut self, request: T) -> BoxFuture<U, E> {
self.inner.call(request)
}
fn disarm(&mut self) {
self.inner.disarm()
}
}
impl<T, U, E> fmt::Debug for BoxService<T, U, E>
@ -82,4 +86,8 @@ where
fn call(&mut self, request: Request) -> Self::Future {
Box::pin(self.inner.call(request))
}
fn disarm(&mut self) {
self.inner.disarm()
}
}

View File

@ -47,6 +47,10 @@ impl<T, U, E> Service<T> for UnsyncBoxService<T, U, E> {
fn call(&mut self, request: T) -> UnsyncBoxFuture<U, E> {
self.inner.call(request)
}
fn disarm(&mut self) {
self.inner.disarm()
}
}
impl<T, U, E> fmt::Debug for UnsyncBoxService<T, U, E>
@ -76,4 +80,8 @@ where
fn call(&mut self, request: Request) -> Self::Future {
Box::pin(self.inner.call(request))
}
fn disarm(&mut self) {
self.inner.disarm()
}
}

View File

@ -41,6 +41,8 @@ use tower_service::Service;
/// fn call(&mut self, req: &'static str) -> Self::Future {
/// ready(Ok(&req[..1]))
/// }
///
/// fn disarm(&mut self) {}
/// }
///
/// #[tokio::main]

View File

@ -55,6 +55,13 @@ where
B(service) => B(service.call(request)),
}
}
fn disarm(&mut self) {
match *self {
Either::A(ref mut s) => s.disarm(),
Either::B(ref mut s) => s.disarm(),
}
}
}
impl<A, B, T, AE, BE> Future for Either<A, B>

View File

@ -55,4 +55,12 @@ where
let inner = self.inner.as_mut().map(|i| i.call(request));
ResponseFuture::new(inner)
}
fn disarm(&mut self) {
if let Some(ref mut i) = self.inner {
i.disarm()
} else {
// None service is always ready, so user may well have called poll_ready
}
}
}

View File

@ -29,4 +29,6 @@ where
fn call(&mut self, req: Request) -> Self::Future {
(self.f)(req)
}
fn disarm(&mut self) {}
}

View File

@ -21,6 +21,9 @@ impl Service<Req> for Mock {
fn call(&mut self, req: Req) -> Self::Future {
self.0.call(req)
}
fn disarm(&mut self) {
self.0.disarm()
}
}
impl tower::load::Load for Mock {

View File

@ -25,6 +25,10 @@ impl Service<String> for MyService {
fn call(&mut self, _req: String) -> Self::Future {
ready(Ok(self.0))
}
fn disarm(&mut self) {
assert!(self.1);
}
}
#[test]

View File

@ -16,6 +16,7 @@ type Error = Box<dyn std::error::Error + Send + Sync>;
struct Srv {
admit: Rc<Cell<bool>>,
count: Rc<Cell<usize>>,
ready: bool,
}
impl Service<&'static str> for Srv {
type Response = &'static str;
@ -27,6 +28,7 @@ impl Service<&'static str> for Srv {
return Poll::Pending;
}
self.ready = true;
self.admit.set(false);
Poll::Ready(Ok(()))
}
@ -35,6 +37,14 @@ impl Service<&'static str> for Srv {
self.count.set(self.count.get() + 1);
ready(Ok(req))
}
fn disarm(&mut self) {
assert!(self.ready, "disarm called when poll_ready did not succeed");
self.ready = false;
if !self.admit.get() {
self.admit.set(true);
}
}
}
#[test]
@ -46,6 +56,7 @@ fn ordered() {
let srv = Srv {
count: count.clone(),
admit: admit.clone(),
ready: false,
};
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let ca = srv.call_all(rx);
@ -103,7 +114,8 @@ fn ordered() {
ca.take_service(),
Srv {
count: count.clone(),
admit
ready: true,
admit,
}
);
}