reconnect: Fix maker error to return in future

This commit is contained in:
Lucio Franco 2019-12-11 13:03:25 -05:00
parent 1843416dfe
commit 6ab5386999
2 changed files with 48 additions and 21 deletions

View File

@ -1,5 +1,5 @@
use crate::Error;
use pin_project::pin_project;
use pin_project::{pin_project, project};
use std::{
future::Future,
pin::Pin,
@ -9,25 +9,51 @@ use std::{
/// Future that resolves to the response or failure to connect.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<F> {
pub struct ResponseFuture<F, E> {
#[pin]
inner: F,
inner: Inner<F, E>,
}
impl<F> ResponseFuture<F> {
#[pin_project]
#[derive(Debug)]
enum Inner<F, E> {
Future(#[pin] F),
Error(Option<E>),
}
impl<F, E> ResponseFuture<F, E> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture { inner }
ResponseFuture {
inner: Inner::Future(inner),
}
}
pub(crate) fn error(error: E) -> Self {
ResponseFuture {
inner: Inner::Error(Some(error)),
}
}
}
impl<F, T, E> Future for ResponseFuture<F>
impl<F, T, E, ME> Future for ResponseFuture<F, ME>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
ME: Into<Error>,
{
type Output = Result<T, Error>;
#[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map_err(Into::into)
//self.project().inner.poll(cx).map_err(Into::into)
let me = self.project();
#[project]
match me.inner.project() {
Inner::Future(fut) => fut.poll(cx).map_err(Into::into),
Inner::Error(e) => {
let e = e.take().expect("Polled after ready.").into();
Poll::Ready(Err(e))
}
}
}
}

View File

@ -24,9 +24,10 @@ where
mk_service: M,
state: State<M::Future, M::Response>,
target: Target,
error: Option<M::Error>,
}
type Error = Box<dyn std::error::Error + Send + Sync>;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug)]
enum State<F, S> {
@ -51,6 +52,7 @@ where
mk_service,
state: State::Idle,
target,
error: None,
}
}
@ -60,6 +62,7 @@ where
mk_service,
state: State::Connected(init_conn),
target,
error: None,
}
}
}
@ -74,14 +77,11 @@ where
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future>;
type Future = ResponseFuture<S::Future, M::Error>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ret;
let mut state;
loop {
match self.state {
match &mut self.state {
State::Idle => {
trace!("poll_ready; idle");
match self.mk_service.poll_ready(cx) {
@ -100,7 +100,7 @@ where
trace!("poll_ready; connecting");
match Pin::new(f).poll(cx) {
Poll::Ready(Ok(service)) => {
state = State::Connected(service);
self.state = State::Connected(service);
}
Poll::Pending => {
trace!("poll_ready; not ready");
@ -108,8 +108,8 @@ where
}
Poll::Ready(Err(e)) => {
trace!("poll_ready; error");
state = State::Idle;
ret = Err(e.into());
self.state = State::Idle;
self.error = Some(e.into());
break;
}
}
@ -127,20 +127,21 @@ where
}
Poll::Ready(Err(_)) => {
trace!("poll_ready; error");
state = State::Idle;
self.state = State::Idle;
}
}
}
}
self.state = state;
}
self.state = state;
Poll::Ready(ret)
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
if let Some(error) = self.error.take() {
return ResponseFuture::error(error);
}
let service = match self.state {
State::Connected(ref mut service) => service,
_ => panic!("service not ready; poll_ready must be called first"),