reconnect: Rework to allow real reconnecting (#437)

Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
This commit is contained in:
Lucio Franco 2020-04-14 16:42:37 -04:00 committed by GitHub
parent d34019045f
commit 8a73440c1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 36 deletions

View File

@ -51,8 +51,12 @@ pub mod builder;
pub mod layer; pub mod layer;
#[cfg(feature = "util")] #[cfg(feature = "util")]
#[doc(inline)]
pub use self::util::{service_fn, ServiceExt}; pub use self::util::{service_fn, ServiceExt};
#[doc(inline)]
pub use crate::builder::ServiceBuilder; pub use crate::builder::ServiceBuilder;
#[doc(inline)]
pub use tower_layer::Layer; pub use tower_layer::Layer;
#[doc(inline)]
pub use tower_service::Service; pub use tower_service::Service;

View File

@ -1,5 +1,5 @@
use super::Error; use super::Error;
use pin_project::pin_project; use pin_project::{pin_project, project};
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -9,25 +9,50 @@ use std::{
/// Future that resolves to the response or failure to connect. /// Future that resolves to the response or failure to connect.
#[pin_project] #[pin_project]
#[derive(Debug)] #[derive(Debug)]
pub struct ResponseFuture<F> { pub struct ResponseFuture<F, E> {
#[pin] #[pin]
inner: F, inner: Inner<F, E>,
} }
impl<F> ResponseFuture<F> { #[pin_project]
#[derive(Debug)]
enum Inner<F, E> {
Future(#[pin] F),
Error(Option<E>),
}
impl<F, E> ResponseFuture<F, E> {
pub(crate) fn new(inner: F) -> Self { pub(crate) fn new(inner: F) -> Self {
ResponseFuture { inner } ResponseFuture {
inner: Inner::Future(inner),
}
}
pub(crate) fn error(error: E) -> Self {
ResponseFuture {
inner: Inner::Error(Some(error)),
}
} }
} }
impl<F, T, E> Future for ResponseFuture<F> impl<F, T, E, ME> Future for ResponseFuture<F, ME>
where where
F: Future<Output = Result<T, E>>, F: Future<Output = Result<T, E>>,
E: Into<Error>, E: Into<Error>,
ME: Into<Error>,
{ {
type Output = Result<T, Error>; type Output = Result<T, Error>;
#[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map_err(Into::into) let me = self.project();
#[project]
match me.inner.project() {
Inner::Future(fut) => fut.poll(cx).map_err(Into::into),
Inner::Error(e) => {
let e = e.take().expect("Polled after ready.").into();
Poll::Ready(Err(e))
}
}
} }
} }

View File

@ -1,8 +1,21 @@
#![allow(missing_docs)] //! Reconnect services when they fail.
//!
//! Reconnect takes some [`MakeService`] and transforms it into a
//! [`Service`]. It then attempts to lazily connect and
//! reconnect on failure. The `Reconnect` service becomes unavailable
//! when the inner `MakeService::poll_ready` returns an error. When the
//! connection future returned from `MakeService::call` fails this will be
//! returned in the next call to `Reconnect::call`. This allows the user to
//! call the service again even if the inner `MakeService` was unable to
//! connect on the last call.
//!
//! [`MakeService`]: ../make/trait.MakeService.html
//! [`Service`]: ../trait.Service.html
pub mod future; mod future;
pub use future::ResponseFuture;
use self::future::ResponseFuture;
use crate::make::MakeService; use crate::make::MakeService;
use std::fmt; use std::fmt;
use std::{ use std::{
@ -13,6 +26,8 @@ use std::{
use tower_service::Service; use tower_service::Service;
use tracing::trace; use tracing::trace;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
/// Reconnect to failed services. /// Reconnect to failed services.
pub struct Reconnect<M, Target> pub struct Reconnect<M, Target>
where where
@ -21,10 +36,9 @@ where
mk_service: M, mk_service: M,
state: State<M::Future, M::Response>, state: State<M::Future, M::Response>,
target: Target, target: Target,
error: Option<M::Error>,
} }
type Error = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug)] #[derive(Debug)]
enum State<F, S> { enum State<F, S> {
Idle, Idle,
@ -37,18 +51,12 @@ where
M: Service<Target>, M: Service<Target>,
{ {
/// Lazily connect and reconnect to a Service. /// Lazily connect and reconnect to a Service.
pub fn new<S, Request>(mk_service: M, target: Target) -> Self pub fn new<S, Request>(mk_service: M, target: Target) -> Self {
where
M: Service<Target, Response = S>,
S: Service<Request>,
M::Error: Into<Error>,
S::Error: Into<Error>,
Target: Clone,
{
Reconnect { Reconnect {
mk_service, mk_service,
state: State::Idle, state: State::Idle,
target, target,
error: None,
} }
} }
@ -58,6 +66,7 @@ where
mk_service, mk_service,
state: State::Connected(init_conn), state: State::Connected(init_conn),
target, target,
error: None,
} }
} }
} }
@ -67,24 +76,20 @@ where
M: Service<Target, Response = S>, M: Service<Target, Response = S>,
S: Service<Request>, S: Service<Request>,
M::Future: Unpin, M::Future: Unpin,
M::Error: Into<Error>, Error: From<M::Error> + From<S::Error>,
S::Error: Into<Error>,
Target: Clone, Target: Clone,
{ {
type Response = S::Response; type Response = S::Response;
type Error = Error; type Error = Error;
type Future = ResponseFuture<S::Future>; type Future = ResponseFuture<S::Future, M::Error>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ret;
let mut state;
loop { loop {
match self.state { match &mut self.state {
State::Idle => { State::Idle => {
trace!("poll_ready; idle"); trace!("poll_ready; idle");
match self.mk_service.poll_ready(cx) { match self.mk_service.poll_ready(cx) {
Poll::Ready(r) => r.map_err(Into::into)?, Poll::Ready(r) => r?,
Poll::Pending => { Poll::Pending => {
trace!("poll_ready; MakeService not ready"); trace!("poll_ready; MakeService not ready");
return Poll::Pending; return Poll::Pending;
@ -99,7 +104,7 @@ where
trace!("poll_ready; connecting"); trace!("poll_ready; connecting");
match Pin::new(f).poll(cx) { match Pin::new(f).poll(cx) {
Poll::Ready(Ok(service)) => { Poll::Ready(Ok(service)) => {
state = State::Connected(service); self.state = State::Connected(service);
} }
Poll::Pending => { Poll::Pending => {
trace!("poll_ready; not ready"); trace!("poll_ready; not ready");
@ -107,8 +112,8 @@ where
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
trace!("poll_ready; error"); trace!("poll_ready; error");
state = State::Idle; self.state = State::Idle;
ret = Err(e.into()); self.error = Some(e.into());
break; break;
} }
} }
@ -126,20 +131,21 @@ where
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
trace!("poll_ready; error"); trace!("poll_ready; error");
state = State::Idle; self.state = State::Idle;
} }
} }
} }
} }
self.state = state;
} }
self.state = state; Poll::Ready(Ok(()))
Poll::Ready(ret)
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {
if let Some(error) = self.error.take() {
return ResponseFuture::error(error);
}
let service = match self.state { let service = match self.state {
State::Connected(ref mut service) => service, State::Connected(ref mut service) => service,
_ => panic!("service not ready; poll_ready must be called first"), _ => panic!("service not ready; poll_ready must be called first"),