Refresh Reconnect (#185)

This commit is contained in:
Carl Lerche 2019-03-08 08:46:12 -08:00 committed by GitHub
parent 4c3742e41b
commit e0d6d5b2f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 83 deletions

View File

@ -0,0 +1,25 @@
use futures::{Future, Poll};
use Error;
pub struct ResponseFuture<F> {
inner: F,
}
impl<F> ResponseFuture<F> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture { inner }
}
}
impl<F> Future for ResponseFuture<F>
where
F: Future,
F::Error: Into<Error>,
{
type Item = F::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(Into::into)
}
}

View File

@ -4,11 +4,15 @@ extern crate log;
extern crate tower_service; extern crate tower_service;
extern crate tower_util; extern crate tower_util;
pub mod future;
use crate::future::ResponseFuture;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tower_service::Service; use tower_service::Service;
use tower_util::MakeService; use tower_util::MakeService;
use std::{error, fmt, marker::PhantomData}; use std::fmt;
pub struct Reconnect<M, Target> pub struct Reconnect<M, Target>
where where
@ -19,16 +23,7 @@ where
target: Target, target: Target,
} }
#[derive(Debug)] type Error = Box<::std::error::Error + Send + Sync>;
pub enum Error<T, U> {
Service(T),
Connect(U),
}
pub struct ResponseFuture<F, E> {
inner: F,
_connect_error_marker: PhantomData<fn() -> E>,
}
#[derive(Debug)] #[derive(Debug)]
enum State<F, S> { enum State<F, S> {
@ -37,14 +32,17 @@ enum State<F, S> {
Connected(S), Connected(S),
} }
// ===== impl Reconnect =====
impl<M, Target> Reconnect<M, Target> impl<M, Target> Reconnect<M, Target>
where where
M: Service<Target>, M: Service<Target>,
Target: Clone,
{ {
pub fn new(mk_service: M, target: Target) -> Self { pub fn new<S, Request>(mk_service: M, target: Target) -> Self
where
M: Service<Target, Response = S>,
S: Service<Request>,
Error: From<M::Error> + From<S::Error>,
Target: Clone,
{
Reconnect { Reconnect {
mk_service, mk_service,
state: State::Idle, state: State::Idle,
@ -57,11 +55,12 @@ impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
where where
M: Service<Target, Response = S>, M: Service<Target, Response = S>,
S: Service<Request>, S: Service<Request>,
Error: From<M::Error> + From<S::Error>,
Target: Clone, Target: Clone,
{ {
type Response = S::Response; type Response = S::Response;
type Error = Error<S::Error, M::Error>; type Error = Error;
type Future = ResponseFuture<S::Future, M::Error>; type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let ret; let ret;
@ -71,16 +70,12 @@ where
match self.state { match self.state {
State::Idle => { State::Idle => {
trace!("poll_ready; idle"); trace!("poll_ready; idle");
match self.mk_service.poll_ready() { match self.mk_service.poll_ready()? {
Ok(Async::Ready(())) => (), Async::Ready(()) => (),
Ok(Async::NotReady) => { Async::NotReady => {
trace!("poll_ready; MakeService not ready"); trace!("poll_ready; MakeService not ready");
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(e) => {
trace!("poll_ready; MakeService error");
return Err(Error::Connect(e));
}
} }
let fut = self.mk_service.make_service(self.target.clone()); let fut = self.mk_service.make_service(self.target.clone());
@ -100,7 +95,7 @@ where
Err(e) => { Err(e) => {
trace!("poll_ready; error"); trace!("poll_ready; error");
state = State::Idle; state = State::Idle;
ret = Err(Error::Connect(e)); ret = Err(e.into());
break; break;
} }
} }
@ -157,61 +152,3 @@ where
.finish() .finish()
} }
} }
// ===== impl ResponseFuture =====
impl<F, E> ResponseFuture<F, E> {
fn new(inner: F) -> Self {
ResponseFuture {
inner,
_connect_error_marker: PhantomData,
}
}
}
impl<F, E> Future for ResponseFuture<F, E>
where
F: Future,
{
type Item = F::Item;
type Error = Error<F::Error, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(Error::Service)
}
}
// ===== impl Error =====
impl<T, U> fmt::Display for Error<T, U>
where
T: fmt::Display,
U: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Service(ref why) => fmt::Display::fmt(why, f),
Error::Connect(ref why) => write!(f, "connection failed: {}", why),
}
}
}
impl<T, U> error::Error for Error<T, U>
where
T: error::Error,
U: error::Error,
{
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Service(ref why) => Some(why),
Error::Connect(ref why) => Some(why),
}
}
fn description(&self) -> &str {
match *self {
Error::Service(_) => "inner service error",
Error::Connect(_) => "connection failed",
}
}
}