diff --git a/tower/src/lib.rs b/tower/src/lib.rs index 2be5211..d1e3672 100644 --- a/tower/src/lib.rs +++ b/tower/src/lib.rs @@ -51,8 +51,12 @@ pub mod builder; pub mod layer; #[cfg(feature = "util")] +#[doc(inline)] pub use self::util::{service_fn, ServiceExt}; +#[doc(inline)] pub use crate::builder::ServiceBuilder; +#[doc(inline)] pub use tower_layer::Layer; +#[doc(inline)] pub use tower_service::Service; diff --git a/tower/src/reconnect/future.rs b/tower/src/reconnect/future.rs index b1d2c74..8ff9a96 100644 --- a/tower/src/reconnect/future.rs +++ b/tower/src/reconnect/future.rs @@ -1,5 +1,5 @@ use super::Error; -use pin_project::pin_project; +use pin_project::{pin_project, project}; use std::{ future::Future, pin::Pin, @@ -9,25 +9,50 @@ use std::{ /// Future that resolves to the response or failure to connect. #[pin_project] #[derive(Debug)] -pub struct ResponseFuture { +pub struct ResponseFuture { #[pin] - inner: F, + inner: Inner, } -impl ResponseFuture { +#[pin_project] +#[derive(Debug)] +enum Inner { + Future(#[pin] F), + Error(Option), +} + +impl ResponseFuture { pub(crate) fn new(inner: F) -> Self { - ResponseFuture { inner } + ResponseFuture { + inner: Inner::Future(inner), + } + } + + pub(crate) fn error(error: E) -> Self { + ResponseFuture { + inner: Inner::Error(Some(error)), + } } } -impl Future for ResponseFuture +impl Future for ResponseFuture where F: Future>, E: Into, + ME: Into, { type Output = Result; + #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx).map_err(Into::into) + let me = self.project(); + #[project] + match me.inner.project() { + Inner::Future(fut) => fut.poll(cx).map_err(Into::into), + Inner::Error(e) => { + let e = e.take().expect("Polled after ready.").into(); + Poll::Ready(Err(e)) + } + } } } diff --git a/tower/src/reconnect/mod.rs b/tower/src/reconnect/mod.rs index 47b9a3b..272579b 100644 --- a/tower/src/reconnect/mod.rs +++ b/tower/src/reconnect/mod.rs @@ -1,8 +1,21 @@ -#![allow(missing_docs)] +//! Reconnect services when they fail. +//! +//! Reconnect takes some [`MakeService`] and transforms it into a +//! [`Service`]. It then attempts to lazily connect and +//! reconnect on failure. The `Reconnect` service becomes unavailable +//! when the inner `MakeService::poll_ready` returns an error. When the +//! connection future returned from `MakeService::call` fails this will be +//! returned in the next call to `Reconnect::call`. This allows the user to +//! call the service again even if the inner `MakeService` was unable to +//! connect on the last call. +//! +//! [`MakeService`]: ../make/trait.MakeService.html +//! [`Service`]: ../trait.Service.html -pub mod future; +mod future; + +pub use future::ResponseFuture; -use self::future::ResponseFuture; use crate::make::MakeService; use std::fmt; use std::{ @@ -13,6 +26,8 @@ use std::{ use tower_service::Service; use tracing::trace; +pub(crate) type Error = Box; + /// Reconnect to failed services. pub struct Reconnect where @@ -21,10 +36,9 @@ where mk_service: M, state: State, target: Target, + error: Option, } -type Error = Box; - #[derive(Debug)] enum State { Idle, @@ -37,18 +51,12 @@ where M: Service, { /// Lazily connect and reconnect to a Service. - pub fn new(mk_service: M, target: Target) -> Self - where - M: Service, - S: Service, - M::Error: Into, - S::Error: Into, - Target: Clone, - { + pub fn new(mk_service: M, target: Target) -> Self { Reconnect { mk_service, state: State::Idle, target, + error: None, } } @@ -58,6 +66,7 @@ where mk_service, state: State::Connected(init_conn), target, + error: None, } } } @@ -67,24 +76,20 @@ where M: Service, S: Service, M::Future: Unpin, - M::Error: Into, - S::Error: Into, + Error: From + From, Target: Clone, { type Response = S::Response; type Error = Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let ret; - let mut state; - loop { - match self.state { + match &mut self.state { State::Idle => { trace!("poll_ready; idle"); match self.mk_service.poll_ready(cx) { - Poll::Ready(r) => r.map_err(Into::into)?, + Poll::Ready(r) => r?, Poll::Pending => { trace!("poll_ready; MakeService not ready"); return Poll::Pending; @@ -99,7 +104,7 @@ where trace!("poll_ready; connecting"); match Pin::new(f).poll(cx) { Poll::Ready(Ok(service)) => { - state = State::Connected(service); + self.state = State::Connected(service); } Poll::Pending => { trace!("poll_ready; not ready"); @@ -107,8 +112,8 @@ where } Poll::Ready(Err(e)) => { trace!("poll_ready; error"); - state = State::Idle; - ret = Err(e.into()); + self.state = State::Idle; + self.error = Some(e.into()); break; } } @@ -126,20 +131,21 @@ where } Poll::Ready(Err(_)) => { trace!("poll_ready; error"); - state = State::Idle; + self.state = State::Idle; } } } } - - self.state = state; } - self.state = state; - Poll::Ready(ret) + Poll::Ready(Ok(())) } fn call(&mut self, request: Request) -> Self::Future { + if let Some(error) = self.error.take() { + return ResponseFuture::error(error); + } + let service = match self.state { State::Connected(ref mut service) => service, _ => panic!("service not ready; poll_ready must be called first"),