balance: update to Box<dyn Error> (#188)

This commit is contained in:
Sean McArthur 2019-03-07 15:11:19 -08:00 committed by GitHub
parent 1e38ee6e1f
commit 4c3742e41b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 76 deletions

View File

@ -0,0 +1,18 @@
use std::fmt;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug)]
pub struct Balance(pub(crate) Error);
impl fmt::Display for Balance {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "load balancing discover error: {}", self.0)
}
}
impl std::error::Error for Balance {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&*self.0)
}
}

View File

@ -13,16 +13,18 @@ extern crate tower_service;
use futures::{Async, Future, Poll};
use indexmap::IndexMap;
use rand::{rngs::SmallRng, SeedableRng};
use std::marker::PhantomData;
use std::{error, fmt};
use std::fmt;
use tower_discover::Discover;
use tower_service::Service;
pub mod choose;
pub mod error;
pub mod load;
pub use choose::Choose;
pub use load::Load;
pub use self::choose::Choose;
pub use self::load::Load;
use self::error::Error;
/// Balances requests across a set of inner services.
#[derive(Debug)]
@ -48,14 +50,7 @@ pub struct Balance<D: Discover, C> {
not_ready: IndexMap<D::Key, D::Service>,
}
/// Error produced by `Balance`
#[derive(Debug)]
pub enum Error<T, U> {
Inner(T),
Balance(U),
}
pub struct ResponseFuture<F: Future, E>(F, PhantomData<E>);
pub struct ResponseFuture<F>(F);
// ===== impl Balance =====
@ -146,15 +141,24 @@ where
pub fn num_not_ready(&self) -> usize {
self.not_ready.len()
}
}
impl<D, C> Balance<D, C>
where
D: Discover,
D::Error: Into<Error>,
C: Choose<D::Key, D::Service>,
{
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn update_from_discover<E>(&mut self) -> Result<(), Error<E, D::Error>> {
fn update_from_discover(&mut self) -> Result<(), error::Balance> {
debug!("updating from discover");
use tower_discover::Change::*;
while let Async::Ready(change) = self.discover.poll().map_err(Error::Balance)? {
while let Async::Ready(change) =
self.discover.poll().map_err(|e| error::Balance(e.into()))?
{
match change {
Insert(key, mut svc) => {
// If the `Insert`ed service is a duplicate of a service already
@ -184,9 +188,7 @@ where
///
/// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted
/// into `ready`, potentially altering the order of `ready` and/or `not_ready`.
fn promote_to_ready<Request>(
&mut self,
) -> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
fn promote_to_ready<Request>(&mut self) -> Result<(), <D::Service as Service<Request>>::Error>
where
D::Service: Service<Request>,
{
@ -205,7 +207,7 @@ where
.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
svc.poll_ready().map_err(Error::Inner)?.is_ready()
svc.poll_ready()?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
@ -232,7 +234,7 @@ where
fn poll_ready_index<Request>(
&mut self,
idx: usize,
) -> Option<Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>>
) -> Option<Poll<(), <D::Service as Service<Request>>::Error>>
where
D::Service: Service<Request>,
{
@ -240,7 +242,7 @@ where
None => return None,
Some((_, svc)) => match svc.poll_ready() {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Err(e) => return Some(Err(e)),
Ok(Async::NotReady) => {}
},
}
@ -258,7 +260,7 @@ where
/// Ensures that .
fn choose_and_poll_ready<Request>(
&mut self,
) -> Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
) -> Poll<(), <D::Service as Service<Request>>::Error>
where
D::Service: Service<Request>,
{
@ -287,15 +289,17 @@ where
}
}
impl<D, C, Request> Service<Request> for Balance<D, C>
impl<D, C, Svc, Request> Service<Request> for Balance<D, C>
where
D: Discover,
D::Service: Service<Request>,
C: Choose<D::Key, D::Service>,
D: Discover<Service = Svc>,
D::Error: Into<Error>,
Svc: Service<Request>,
Svc::Error: Into<Error>,
C: Choose<D::Key, Svc>,
{
type Response = <D::Service as Service<Request>>::Response;
type Error = Error<<D::Service as Service<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as Service<Request>>::Future, D::Error>;
type Response = Svc::Response;
type Error = Error;
type Future = ResponseFuture<Svc::Future>;
/// Prepares the balancer to process a request.
///
@ -310,15 +314,16 @@ where
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx)
.expect("invalid dispatched ready key")?;
.expect("invalid dispatched ready key")
.map_err(Into::into)?;
}
// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready()?;
self.promote_to_ready().map_err(Into::into)?;
// Choose the next service to be used by `call`.
self.choose_and_poll_ready()
self.choose_and_poll_ready().map_err(Into::into)
}
fn call(&mut self, request: Request) -> Self::Future {
@ -330,53 +335,22 @@ where
self.dispatched_ready_index = Some(idx);
let rsp = svc.call(request);
ResponseFuture(rsp, PhantomData)
ResponseFuture(rsp)
}
}
// ===== impl ResponseFuture =====
impl<F: Future, E> Future for ResponseFuture<F, E> {
impl<F> Future for ResponseFuture<F>
where
F: Future,
F::Error: Into<Error>,
{
type Item = F::Item;
type Error = Error<F::Error, E>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(Error::Inner)
}
}
// ===== impl Error =====
impl<T, U> fmt::Display for Error<T, U>
where
T: fmt::Display,
U: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Inner(ref why) => fmt::Display::fmt(why, f),
Error::Balance(ref why) => write!(f, "load balancing failed: {}", why),
}
}
}
impl<T, U> error::Error for Error<T, U>
where
T: error::Error,
U: error::Error,
{
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Inner(ref why) => Some(why),
Error::Balance(ref why) => Some(why),
}
}
fn description(&self) -> &str {
match *self {
Error::Inner(_) => "inner service error",
Error::Balance(_) => "load balancing failed",
}
self.0.poll().map_err(Into::into)
}
}
@ -398,7 +372,7 @@ mod tests {
impl Discover for ReluctantDisco {
type Key = usize;
type Service = ReluctantService;
type Error = ();
type Error = Error;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
let r = self
@ -413,10 +387,10 @@ mod tests {
impl Service<()> for ReluctantService {
type Response = ();
type Error = ();
type Future = future::FutureResult<(), ()>;
type Error = Error;
type Future = future::FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), ()> {
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.polls_until_ready == 0 {
return Ok(Async::Ready(()));
}

View File

@ -12,6 +12,7 @@ extern crate tower_service;
use futures::{Async, Poll, Stream};
use tower_service::Service;
use std::fmt;
use std::hash::Hash;
use std::iter::{Enumerate, IntoIterator};
use std::marker::PhantomData;
@ -51,7 +52,6 @@ where
{
inner: Enumerate<T::IntoIter>,
}
// ===== impl List =====
impl<T, U> List<T>
@ -74,7 +74,7 @@ where
{
type Key = usize;
type Service = U;
type Error = ();
type Error = Never;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.inner.next() {
@ -129,6 +129,18 @@ where
}
}
#[doc(hidden)]
#[derive(Debug)]
pub enum Never {}
impl fmt::Display for Never {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
match *self {}
}
}
impl std::error::Error for Never {}
// check that List can be directly over collections
#[cfg(test)]
#[allow(dead_code)]