spawn-ready: Drives a service's readiness on an executor (#283)

Some layers cannot guarantee that they will poll inner services in a
timely fashion. For instance, the balancer polls its inner services to
check for readiness, but it does so randomly. If its inner service
must be polled several times to become ready, e.g., because it's driving
the initiation of a TLS connection, then the balancer may not drive the
handshake to completion.

The `SpawnReady` layer ensures that its inner service is driven to
readiness by spawning a background task.
This commit is contained in:
Oliver Gould 2019-05-29 09:57:46 -07:00 committed by GitHub
parent a611a14096
commit 42f4b7781e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 466 additions and 0 deletions

View File

@ -13,6 +13,7 @@ members = [
"tower-reconnect",
"tower-retry",
"tower-service",
"tower-spawn-ready",
"tower-test",
"tower-timeout",
"tower-util",

View File

@ -24,6 +24,7 @@ jobs:
- tower-reconnect
- tower-retry
- tower-service
- tower-spawn-ready
- tower-test
- tower-timeout
- tower-util

View File

@ -12,6 +12,7 @@ tower-load-shed = { path = "tower-load-shed" }
tower-reconnect = { path = "tower-reconnect" }
tower-retry = { path = "tower-retry" }
tower-service = { path = "tower-service" }
tower-spawn-ready = { path = "tower-spawn-ready" }
tower-test = { path = "tower-test" }
tower-timeout = { path = "tower-timeout" }
tower-util = { path = "tower-util" }

View File

@ -0,0 +1 @@
# 0.1.0 (unreleased)

View File

@ -0,0 +1,35 @@
[package]
name = "tower-spawn-ready"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-spawn-ready/0.1.0"
description = """
Drives service readiness via a spawned task
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
publish = false # FIXME
[dependencies]
futures = "0.1.25"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tower-util = "0.1.0"
tokio-executor = "0.1.7"
tokio-sync = "0.1.0"
[dev-dependencies]
tower = { version = "0.1.0", path = "../tower" }
tower-test = { version = "0.1.0", path = "../tower-test" }

25
tower-spawn-ready/LICENSE Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,13 @@
# Tower Spawn Ready
Spawn Ready ensures that its inner service is driven to readiness on an executor. Useful with pooling layers that may poll their inner service infrequently.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -0,0 +1,33 @@
//! Error types
use std::fmt;
use tokio_executor;
/// Error produced when spawning the worker fails
#[derive(Debug)]
pub struct SpawnError {
inner: tokio_executor::SpawnError,
}
/// Errors produced by `SpawnReady`.
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
// ===== impl SpawnError =====
impl SpawnError {
pub(crate) fn new(inner: tokio_executor::SpawnError) -> Self {
Self { inner }
}
}
impl fmt::Display for SpawnError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt(f)
}
}
impl std::error::Error for SpawnError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.inner)
}
}

View File

@ -0,0 +1,81 @@
use crate::error::Error;
use futures::{Async, Future, Poll};
use tokio_executor::TypedExecutor;
use tokio_sync::oneshot;
use tower_service::Service;
use tower_util::Ready;
pub struct BackgroundReady<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
ready: Ready<T, Request>,
tx: Option<oneshot::Sender<Result<T, Error>>>,
}
/// This trait allows you to use either Tokio's threaded runtime's executor or
/// the `current_thread` runtime's executor depending on if `T` is `Send` or
/// `!Send`.
pub trait BackgroundReadyExecutor<T, Request>: TypedExecutor<BackgroundReady<T, Request>>
where
T: Service<Request>,
T::Error: Into<Error>,
{
}
impl<T, Request, E> BackgroundReadyExecutor<T, Request> for E
where
E: TypedExecutor<BackgroundReady<T, Request>>,
T: Service<Request>,
T::Error: Into<Error>,
{
}
pub(crate) fn background_ready<T, Request>(
service: T,
) -> (
BackgroundReady<T, Request>,
oneshot::Receiver<Result<T, Error>>,
)
where
T: Service<Request>,
T::Error: Into<Error>,
{
let (tx, rx) = oneshot::channel();
let bg = BackgroundReady {
ready: Ready::new(service),
tx: Some(tx),
};
(bg, rx)
}
impl<T, Request> Future for BackgroundReady<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.tx.as_mut().expect("illegal state").poll_close() {
Ok(Async::Ready(())) | Err(()) => return Err(()),
Ok(Async::NotReady) => {}
}
let result = match self.ready.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(svc)) => Ok(svc),
Err(e) => Err(e.into()),
};
self.tx
.take()
.expect("illegal state")
.send(result)
.map_err(|_| ())?;
Ok(Async::Ready(()))
}
}

View File

@ -0,0 +1,52 @@
use crate::{error::Error, future::BackgroundReadyExecutor, service::SpawnReady};
use std::{fmt, marker::PhantomData};
use tokio_executor::DefaultExecutor;
use tower_layer::Layer;
use tower_service::Service;
/// Spawns tasks to drive its inner service to readiness.
pub struct SpawnReadyLayer<Request, E = DefaultExecutor> {
executor: E,
_p: PhantomData<fn(Request)>,
}
impl<Request> SpawnReadyLayer<Request, DefaultExecutor> {
pub fn new() -> Self {
SpawnReadyLayer {
executor: DefaultExecutor::current(),
_p: PhantomData,
}
}
}
impl<Request, E: Clone> SpawnReadyLayer<Request, E> {
pub fn with_executor(executor: E) -> Self {
SpawnReadyLayer {
executor,
_p: PhantomData,
}
}
}
impl<E, S, Request> Layer<S> for SpawnReadyLayer<Request, E>
where
S: Service<Request> + Send + 'static,
S::Error: Into<Error>,
E: BackgroundReadyExecutor<S, Request> + Clone,
{
type Service = SpawnReady<S, Request, E>;
fn layer(&self, service: S) -> Self::Service {
SpawnReady::with_executor(service, self.executor.clone())
}
}
impl<Request, E> fmt::Debug for SpawnReadyLayer<Request, E>
where
// Require E: Debug in case we want to print the executor at a later date
E: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SpawnReadyLayer").finish()
}
}

View File

@ -0,0 +1,14 @@
#![doc(html_root_url = "https://docs.rs/tower-spawn-ready/0.1.0")]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
//! When an underlying service is not ready, drive it to readiness on a
//! background task.
pub mod error;
pub mod future;
mod layer;
mod service;
pub use crate::layer::SpawnReadyLayer;
pub use crate::service::SpawnReady;

View File

@ -0,0 +1,107 @@
use crate::{
error::{Error, SpawnError},
future::{background_ready, BackgroundReadyExecutor},
};
use futures::{future, try_ready, Async, Future, Poll};
use std::marker::PhantomData;
use tokio_executor::DefaultExecutor;
use tokio_sync::oneshot;
use tower_service::Service;
/// Spawns tasks to drive an inner service to readiness.
///
/// See crate level documentation for more details.
pub struct SpawnReady<T, Request, E>
where
T: Service<Request>,
T::Error: Into<Error>,
E: BackgroundReadyExecutor<T, Request>,
{
executor: E,
inner: Inner<T>,
_marker: PhantomData<fn(Request)>,
}
enum Inner<T> {
Service(Option<T>),
Future(oneshot::Receiver<Result<T, Error>>),
}
impl<T, Request> SpawnReady<T, Request, DefaultExecutor>
where
T: Service<Request> + Send + 'static,
T::Error: Into<Error>,
Request: 'static,
{
/// Creates a new `SpawnReady` wrapping `service`.
///
/// The default Tokio executor is used to drive service readiness, which
/// means that this method must be called while on the Tokio runtime.
pub fn new(service: T) -> Self {
Self::with_executor(service, DefaultExecutor::current())
}
}
impl<T, Request, E> SpawnReady<T, Request, E>
where
T: Service<Request> + Send,
T::Error: Into<Error>,
E: BackgroundReadyExecutor<T, Request>,
{
/// Creates a new `SpawnReady` wrapping `service`.
///
/// `executor` is used to spawn a new `BackgroundReady` task that is
/// dedicated to driving the inner service to readiness.
pub fn with_executor(service: T, executor: E) -> Self {
Self {
executor,
inner: Inner::Service(Some(service)),
_marker: PhantomData,
}
}
}
impl<T, Request, E> Service<Request> for SpawnReady<T, Request, E>
where
T: Service<Request> + Send,
T::Error: Into<Error>,
E: BackgroundReadyExecutor<T, Request>,
{
type Response = T::Response;
type Error = Error;
type Future = future::MapErr<T::Future, fn(T::Error) -> Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
loop {
self.inner = match self.inner {
Inner::Service(ref mut svc) => {
if svc
.as_mut()
.expect("illegal state")
.poll_ready()
.map_err(Into::into)?
.is_ready()
{
return Ok(Async::Ready(()));
}
let (bg, rx) = background_ready(svc.take().expect("illegal state"));
self.executor.spawn(bg).map_err(SpawnError::new)?;
Inner::Future(rx)
}
Inner::Future(ref mut fut) => {
let svc = try_ready!(fut.poll())?;
Inner::Service(Some(svc))
}
}
}
}
fn call(&mut self, request: Request) -> Self::Future {
match self.inner {
Inner::Service(Some(ref mut svc)) => svc.call(request).map_err(Into::into),
_ => unreachable!("poll_ready must be called"),
}
}
}

View File

@ -0,0 +1,102 @@
use futures::prelude::*;
use std::{thread, time::Duration};
use tokio_executor::{SpawnError, TypedExecutor};
use tower::Service;
use tower_spawn_ready::{error, SpawnReady};
use tower_test::mock;
#[test]
fn when_inner_is_not_ready() {
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
with_task(|| {
let poll = service.poll_ready();
assert!(poll.expect("poll_ready").is_not_ready());
});
// Make the service is Ready
handle.allow(1);
thread::sleep(Duration::from_millis(100));
with_task(|| {
let poll = service.poll_ready();
assert!(poll.expect("poll_ready").is_ready());
});
}
#[test]
fn when_inner_fails() {
//use std::error::Error as StdError;
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
with_task(|| {
let e = service.poll_ready().unwrap_err();
assert_eq!(e.to_string(), "foobar");
});
}
#[test]
fn when_spawn_fails() {
let (service, mut handle) = mock::pair::<(), ()>();
let exec = ExecFn(|_| Err(()));
let mut service = SpawnReady::with_executor(service, exec);
// Make the service NotReady so a background task is spawned.
handle.allow(0);
let err = with_task(|| service.poll_ready().expect_err("poll_ready should error"));
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
struct Exec;
impl<F> TypedExecutor<F> for Exec
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
thread::spawn(move || {
fut.wait().unwrap();
});
Ok(())
}
}
struct ExecFn<Func>(Func);
impl<Func, F> TypedExecutor<F> for ExecFn<Func>
where
Func: Fn(F) -> Result<(), ()>,
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
(self.0)(fut).map_err(|()| SpawnError::shutdown())
}
}
fn new_service() -> (SpawnReady<Mock, &'static str, Exec>, Handle) {
let (service, handle) = mock::pair();
let service = SpawnReady::with_executor(service, Exec);
(service, handle)
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}